-
Notifications
You must be signed in to change notification settings - Fork 4.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: implement converters for airbyte type/value to SQL (#49924)
- Loading branch information
1 parent
4bec6b3
commit 7b0ad97
Showing
6 changed files
with
710 additions
and
0 deletions.
There are no files selected for viewing
59 changes: 59 additions & 0 deletions
59
...ectors/destination-mssql-v2/src/main/kotlin/io/airbyte/cdk/load/data/TimeStringUtility.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/* | ||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.cdk.load.data | ||
|
||
import java.time.LocalDate | ||
import java.time.LocalDateTime | ||
import java.time.LocalTime | ||
import java.time.OffsetDateTime | ||
import java.time.OffsetTime | ||
import java.time.ZoneOffset | ||
import java.time.ZonedDateTime | ||
|
||
// TODO Copied from load-iceberg-parquet. This class needs to be extracted | ||
object TimeStringUtility { | ||
|
||
fun toLocalDate(dateString: String): LocalDate { | ||
return LocalDate.parse(dateString, TimeStringToInteger.DATE_TIME_FORMATTER) | ||
} | ||
|
||
fun toLocalDateTime(dateString: String): LocalDateTime { | ||
return LocalDateTime.parse(dateString, TimeStringToInteger.DATE_TIME_FORMATTER) | ||
} | ||
|
||
fun toOffset(timeString: String): LocalTime { | ||
return try { | ||
toMicrosOfDayWithTimezone(timeString) | ||
} catch (e: Exception) { | ||
toMicrosOfDayWithoutTimezone(timeString) | ||
} | ||
} | ||
|
||
private fun toMicrosOfDayWithTimezone(timeString: String): LocalTime { | ||
return OffsetTime.parse(timeString, TimeStringToInteger.TIME_FORMATTER).toLocalTime() | ||
} | ||
|
||
private fun toMicrosOfDayWithoutTimezone(timeString: String): LocalTime { | ||
return LocalTime.parse(timeString, TimeStringToInteger.TIME_FORMATTER) | ||
} | ||
|
||
fun toOffsetDateTime(timestampString: String): OffsetDateTime { | ||
return try { | ||
toOffsetDateTimeWithTimezone(timestampString) | ||
} catch (e: Exception) { | ||
toOffsetDateTimeWithoutTimezone(timestampString) | ||
} | ||
} | ||
|
||
private fun toOffsetDateTimeWithTimezone(timestampString: String): OffsetDateTime { | ||
return ZonedDateTime.parse(timestampString, TimeStringToInteger.DATE_TIME_FORMATTER) | ||
.toOffsetDateTime() | ||
} | ||
|
||
private fun toOffsetDateTimeWithoutTimezone(timestampString: String): OffsetDateTime { | ||
return LocalDateTime.parse(timestampString, TimeStringToInteger.DATE_TIME_FORMATTER) | ||
.atOffset(ZoneOffset.UTC) | ||
} | ||
} |
84 changes: 84 additions & 0 deletions
84
.../main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToSqlType.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
/* | ||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.mssql.v2.convert | ||
|
||
import io.airbyte.cdk.load.data.AirbyteType | ||
import io.airbyte.cdk.load.data.ArrayType | ||
import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema | ||
import io.airbyte.cdk.load.data.BooleanType | ||
import io.airbyte.cdk.load.data.DateType | ||
import io.airbyte.cdk.load.data.IntegerType | ||
import io.airbyte.cdk.load.data.NumberType | ||
import io.airbyte.cdk.load.data.ObjectType | ||
import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema | ||
import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema | ||
import io.airbyte.cdk.load.data.StringType | ||
import io.airbyte.cdk.load.data.TimeTypeWithTimezone | ||
import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone | ||
import io.airbyte.cdk.load.data.TimestampTypeWithTimezone | ||
import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone | ||
import io.airbyte.cdk.load.data.UnionType | ||
import io.airbyte.cdk.load.data.UnknownType | ||
import io.airbyte.integrations.destination.mssql.v2.model.SqlColumn | ||
import io.airbyte.integrations.destination.mssql.v2.model.SqlTable | ||
import java.sql.Types | ||
|
||
/** CDK pipeline [AirbyteType] to SQL [Types] converter. */ | ||
class AirbyteTypeToSqlType { | ||
|
||
/** | ||
* Converts an [AirbyteType] to the associated SQL [Types] value. | ||
* | ||
* @param airbyteSchema The stream's Airbyte schema, represented as an [AirbyteType] | ||
* @return The associated SQL [Types] value. | ||
* @throws IllegalArgumentException if the [AirbyteType] is not supported. | ||
*/ | ||
fun convert(airbyteSchema: AirbyteType): Int { | ||
return when (airbyteSchema) { | ||
is ObjectType -> Types.BLOB | ||
is ArrayType -> Types.BLOB | ||
is ArrayTypeWithoutSchema -> Types.BLOB | ||
is BooleanType -> Types.BOOLEAN | ||
is DateType -> Types.DATE | ||
is IntegerType -> Types.BIGINT | ||
is NumberType -> Types.DECIMAL | ||
is ObjectTypeWithEmptySchema -> Types.BLOB | ||
is ObjectTypeWithoutSchema -> Types.BLOB | ||
is StringType -> Types.VARCHAR | ||
is TimeTypeWithTimezone -> Types.TIME_WITH_TIMEZONE | ||
is TimeTypeWithoutTimezone -> Types.TIME | ||
is TimestampTypeWithTimezone -> Types.TIMESTAMP_WITH_TIMEZONE | ||
is TimestampTypeWithoutTimezone -> Types.TIMESTAMP | ||
is UnionType -> Types.BLOB | ||
is UnknownType -> Types.BLOB | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Extension function that converts an [ObjectType] into a [SqlTable] that can be used to define a | ||
* SQL table. | ||
* | ||
* @param primaryKeys The list of configured primary key properties that should be treated as | ||
* primary keys in the generated [SqlTable] | ||
* @return The [SqlTable] that represents the table to be mapped to the stream represented by the | ||
* [ObjectType]. | ||
*/ | ||
fun ObjectType.toSqlTable(primaryKeys: List<List<String>>): SqlTable { | ||
val identifierFieldNames = primaryKeys.flatten().toSet() | ||
val sqlTypeConverter = AirbyteTypeToSqlType() | ||
val columns = | ||
this.properties.entries.map { (name, field) -> | ||
val isPrimaryKey = identifierFieldNames.contains(name) | ||
val isNullable = !isPrimaryKey && field.nullable | ||
SqlColumn( | ||
name = name, | ||
type = sqlTypeConverter.convert(field.type), | ||
isPrimaryKey = isPrimaryKey, | ||
isNullable = isNullable | ||
) | ||
} | ||
return SqlTable(columns = columns) | ||
} |
88 changes: 88 additions & 0 deletions
88
...ain/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToSqlValue.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
/* | ||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.mssql.v2.convert | ||
|
||
import io.airbyte.cdk.load.data.AirbyteValue | ||
import io.airbyte.cdk.load.data.ArrayValue | ||
import io.airbyte.cdk.load.data.BooleanValue | ||
import io.airbyte.cdk.load.data.DateValue | ||
import io.airbyte.cdk.load.data.IntegerValue | ||
import io.airbyte.cdk.load.data.NullValue | ||
import io.airbyte.cdk.load.data.NumberValue | ||
import io.airbyte.cdk.load.data.ObjectValue | ||
import io.airbyte.cdk.load.data.StringValue | ||
import io.airbyte.cdk.load.data.TimeStringUtility.toLocalDate | ||
import io.airbyte.cdk.load.data.TimeStringUtility.toLocalDateTime | ||
import io.airbyte.cdk.load.data.TimeStringUtility.toOffset | ||
import io.airbyte.cdk.load.data.TimeValue | ||
import io.airbyte.cdk.load.data.TimestampValue | ||
import io.airbyte.cdk.load.data.UnknownValue | ||
import io.airbyte.cdk.load.util.serializeToJsonBytes | ||
import java.sql.Date | ||
import java.sql.Time | ||
import java.sql.Timestamp | ||
|
||
/** CDK pipeline [AirbyteValue] to SQL values converter. */ | ||
class AirbyteValueToSqlValue { | ||
|
||
/** | ||
* Converts an [AirbyteValue] to the associated SQL value. | ||
* | ||
* @param airbyteValue The [AirbyteValue] from an Airbyte record | ||
* @return The corresponding SQL value for the given [AirbyteValue]. | ||
* @throws IllegalArgumentException if the [AirbyteValue] is not supported. | ||
*/ | ||
fun convert(airbyteValue: AirbyteValue): Any? { | ||
return when (airbyteValue) { | ||
is ObjectValue -> { | ||
val convertedValues = | ||
airbyteValue.values.entries.associate { (name, value) -> | ||
name to convert(value) | ||
} | ||
convertedValues | ||
} | ||
is ArrayValue -> airbyteValue.values.map { convert(it) } | ||
is BooleanValue -> airbyteValue.value | ||
is DateValue -> Date.valueOf(toLocalDate(airbyteValue.value)) | ||
is IntegerValue -> airbyteValue.value | ||
is NullValue -> null | ||
is NumberValue -> airbyteValue.value.toDouble().toBigDecimal() | ||
is StringValue -> airbyteValue.value | ||
is TimeValue -> Time.valueOf(toOffset(airbyteValue.value)) | ||
is TimestampValue -> Timestamp.valueOf(toLocalDateTime(airbyteValue.value)) | ||
is UnknownValue -> airbyteValue.value.serializeToJsonBytes() | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Extension function that converts an [ObjectValue] into a row of SQL values. | ||
* | ||
* @param sqlTable The [SqlTable] that contains data type information for each column. This is used | ||
* to filter the [ObjectValue]'s values to only those that exist in the table. | ||
* @return A [SqlTableRow] that contains values converted to their SQL data type equivalents from | ||
* the provided [ObjectValue]. | ||
*/ | ||
fun ObjectValue.toSqlValue(sqlTable: SqlTable): SqlTableRow { | ||
val converter = AirbyteValueToSqlValue() | ||
return SqlTableRow( | ||
values = | ||
this.values | ||
.filter { (name, _) -> sqlTable.columns.find { it.name == name } != null } | ||
.map { (name, value) -> | ||
val dataType = sqlTable.columns.find { it.name == name }!!.type | ||
val converted = | ||
when (value) { | ||
is ObjectValue -> | ||
(converter.convert(value) as LinkedHashMap<*, *>) | ||
.serializeToJsonBytes() | ||
is ArrayValue -> | ||
(converter.convert(value) as List<*>).serializeToJsonBytes() | ||
else -> converter.convert(value) | ||
} | ||
SqlTableRowValue(name = name, value = converted, type = dataType) | ||
} | ||
) | ||
} |
45 changes: 45 additions & 0 deletions
45
...-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/model/SqlModels.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.mssql.v2.model | ||
|
||
import java.sql.Types | ||
|
||
/** | ||
* Representation of a colum in a SQL table. | ||
* | ||
* @param name The name of the column | ||
* @param type The data type of the column (see [Types] for values). | ||
* @param isPrimaryKey Whether the column represents a primary key. | ||
* @param isNullable Whether the column's value supports null values. | ||
*/ | ||
data class SqlColumn( | ||
val name: String, | ||
val type: Int, | ||
val isPrimaryKey: Boolean = false, | ||
val isNullable: Boolean = false | ||
) | ||
|
||
/** | ||
* Representation of a SQL table. | ||
* | ||
* @param columns The list of columns in the table. | ||
*/ | ||
data class SqlTable(val columns: List<SqlColumn>) | ||
|
||
/** | ||
* Representation of a value in a SQL row/column cell. | ||
* | ||
* @param name The name of the column. | ||
* @param value The value of the row/column cell. | ||
* @param type The SQL type of the row/column cell (see [Types] for values). | ||
*/ | ||
data class SqlTableRowValue(val name: String, val value: Any?, val type: Int) | ||
|
||
/** | ||
* Representation of a row of values in a SQL table. | ||
* | ||
* @param values A list of values stored in the row. | ||
*/ | ||
data class SqlTableRow(val values: List<SqlTableRowValue>) |
Oops, something went wrong.