Skip to content

Commit

Permalink
feat: implement converters for airbyte type/value to SQL (#49924)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev authored Dec 19, 2024
1 parent d0049b9 commit 9cc3650
Show file tree
Hide file tree
Showing 6 changed files with 710 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.OffsetDateTime
import java.time.OffsetTime
import java.time.ZoneOffset
import java.time.ZonedDateTime

// TODO Copied from load-iceberg-parquet. This class needs to be extracted
object TimeStringUtility {

fun toLocalDate(dateString: String): LocalDate {
return LocalDate.parse(dateString, TimeStringToInteger.DATE_TIME_FORMATTER)
}

fun toLocalDateTime(dateString: String): LocalDateTime {
return LocalDateTime.parse(dateString, TimeStringToInteger.DATE_TIME_FORMATTER)
}

fun toOffset(timeString: String): LocalTime {
return try {
toMicrosOfDayWithTimezone(timeString)
} catch (e: Exception) {
toMicrosOfDayWithoutTimezone(timeString)
}
}

private fun toMicrosOfDayWithTimezone(timeString: String): LocalTime {
return OffsetTime.parse(timeString, TimeStringToInteger.TIME_FORMATTER).toLocalTime()
}

private fun toMicrosOfDayWithoutTimezone(timeString: String): LocalTime {
return LocalTime.parse(timeString, TimeStringToInteger.TIME_FORMATTER)
}

fun toOffsetDateTime(timestampString: String): OffsetDateTime {
return try {
toOffsetDateTimeWithTimezone(timestampString)
} catch (e: Exception) {
toOffsetDateTimeWithoutTimezone(timestampString)
}
}

private fun toOffsetDateTimeWithTimezone(timestampString: String): OffsetDateTime {
return ZonedDateTime.parse(timestampString, TimeStringToInteger.DATE_TIME_FORMATTER)
.toOffsetDateTime()
}

private fun toOffsetDateTimeWithoutTimezone(timestampString: String): OffsetDateTime {
return LocalDateTime.parse(timestampString, TimeStringToInteger.DATE_TIME_FORMATTER)
.atOffset(ZoneOffset.UTC)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.mssql.v2.convert

import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.ArrayType
import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema
import io.airbyte.cdk.load.data.BooleanType
import io.airbyte.cdk.load.data.DateType
import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.load.data.NumberType
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema
import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.TimeTypeWithTimezone
import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone
import io.airbyte.cdk.load.data.TimestampTypeWithTimezone
import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone
import io.airbyte.cdk.load.data.UnionType
import io.airbyte.cdk.load.data.UnknownType
import io.airbyte.integrations.destination.mssql.v2.model.SqlColumn
import io.airbyte.integrations.destination.mssql.v2.model.SqlTable
import java.sql.Types

/** CDK pipeline [AirbyteType] to SQL [Types] converter. */
class AirbyteTypeToSqlType {

/**
* Converts an [AirbyteType] to the associated SQL [Types] value.
*
* @param airbyteSchema The stream's Airbyte schema, represented as an [AirbyteType]
* @return The associated SQL [Types] value.
* @throws IllegalArgumentException if the [AirbyteType] is not supported.
*/
fun convert(airbyteSchema: AirbyteType): Int {
return when (airbyteSchema) {
is ObjectType -> Types.BLOB
is ArrayType -> Types.BLOB
is ArrayTypeWithoutSchema -> Types.BLOB
is BooleanType -> Types.BOOLEAN
is DateType -> Types.DATE
is IntegerType -> Types.BIGINT
is NumberType -> Types.DECIMAL
is ObjectTypeWithEmptySchema -> Types.BLOB
is ObjectTypeWithoutSchema -> Types.BLOB
is StringType -> Types.VARCHAR
is TimeTypeWithTimezone -> Types.TIME_WITH_TIMEZONE
is TimeTypeWithoutTimezone -> Types.TIME
is TimestampTypeWithTimezone -> Types.TIMESTAMP_WITH_TIMEZONE
is TimestampTypeWithoutTimezone -> Types.TIMESTAMP
is UnionType -> Types.BLOB
is UnknownType -> Types.BLOB
}
}
}

/**
* Extension function that converts an [ObjectType] into a [SqlTable] that can be used to define a
* SQL table.
*
* @param primaryKeys The list of configured primary key properties that should be treated as
* primary keys in the generated [SqlTable]
* @return The [SqlTable] that represents the table to be mapped to the stream represented by the
* [ObjectType].
*/
fun ObjectType.toSqlTable(primaryKeys: List<List<String>>): SqlTable {
val identifierFieldNames = primaryKeys.flatten().toSet()
val sqlTypeConverter = AirbyteTypeToSqlType()
val columns =
this.properties.entries.map { (name, field) ->
val isPrimaryKey = identifierFieldNames.contains(name)
val isNullable = !isPrimaryKey && field.nullable
SqlColumn(
name = name,
type = sqlTypeConverter.convert(field.type),
isPrimaryKey = isPrimaryKey,
isNullable = isNullable
)
}
return SqlTable(columns = columns)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.mssql.v2.convert

import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ArrayValue
import io.airbyte.cdk.load.data.BooleanValue
import io.airbyte.cdk.load.data.DateValue
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.NumberValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.TimeStringUtility.toLocalDate
import io.airbyte.cdk.load.data.TimeStringUtility.toLocalDateTime
import io.airbyte.cdk.load.data.TimeStringUtility.toOffset
import io.airbyte.cdk.load.data.TimeValue
import io.airbyte.cdk.load.data.TimestampValue
import io.airbyte.cdk.load.data.UnknownValue
import io.airbyte.cdk.load.util.serializeToJsonBytes
import java.sql.Date
import java.sql.Time
import java.sql.Timestamp

/** CDK pipeline [AirbyteValue] to SQL values converter. */
class AirbyteValueToSqlValue {

/**
* Converts an [AirbyteValue] to the associated SQL value.
*
* @param airbyteValue The [AirbyteValue] from an Airbyte record
* @return The corresponding SQL value for the given [AirbyteValue].
* @throws IllegalArgumentException if the [AirbyteValue] is not supported.
*/
fun convert(airbyteValue: AirbyteValue): Any? {
return when (airbyteValue) {
is ObjectValue -> {
val convertedValues =
airbyteValue.values.entries.associate { (name, value) ->
name to convert(value)
}
convertedValues
}
is ArrayValue -> airbyteValue.values.map { convert(it) }
is BooleanValue -> airbyteValue.value
is DateValue -> Date.valueOf(toLocalDate(airbyteValue.value))
is IntegerValue -> airbyteValue.value
is NullValue -> null
is NumberValue -> airbyteValue.value.toDouble().toBigDecimal()
is StringValue -> airbyteValue.value
is TimeValue -> Time.valueOf(toOffset(airbyteValue.value))
is TimestampValue -> Timestamp.valueOf(toLocalDateTime(airbyteValue.value))
is UnknownValue -> airbyteValue.value.serializeToJsonBytes()
}
}
}

/**
* Extension function that converts an [ObjectValue] into a row of SQL values.
*
* @param sqlTable The [SqlTable] that contains data type information for each column. This is used
* to filter the [ObjectValue]'s values to only those that exist in the table.
* @return A [SqlTableRow] that contains values converted to their SQL data type equivalents from
* the provided [ObjectValue].
*/
fun ObjectValue.toSqlValue(sqlTable: SqlTable): SqlTableRow {
val converter = AirbyteValueToSqlValue()
return SqlTableRow(
values =
this.values
.filter { (name, _) -> sqlTable.columns.find { it.name == name } != null }
.map { (name, value) ->
val dataType = sqlTable.columns.find { it.name == name }!!.type
val converted =
when (value) {
is ObjectValue ->
(converter.convert(value) as LinkedHashMap<*, *>)
.serializeToJsonBytes()
is ArrayValue ->
(converter.convert(value) as List<*>).serializeToJsonBytes()
else -> converter.convert(value)
}
SqlTableRowValue(name = name, value = converted, type = dataType)
}
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.mssql.v2.model

import java.sql.Types

/**
* Representation of a colum in a SQL table.
*
* @param name The name of the column
* @param type The data type of the column (see [Types] for values).
* @param isPrimaryKey Whether the column represents a primary key.
* @param isNullable Whether the column's value supports null values.
*/
data class SqlColumn(
val name: String,
val type: Int,
val isPrimaryKey: Boolean = false,
val isNullable: Boolean = false
)

/**
* Representation of a SQL table.
*
* @param columns The list of columns in the table.
*/
data class SqlTable(val columns: List<SqlColumn>)

/**
* Representation of a value in a SQL row/column cell.
*
* @param name The name of the column.
* @param value The value of the row/column cell.
* @param type The SQL type of the row/column cell (see [Types] for values).
*/
data class SqlTableRowValue(val name: String, val value: Any?, val type: Int)

/**
* Representation of a row of values in a SQL table.
*
* @param values A list of values stored in the row.
*/
data class SqlTableRow(val values: List<SqlTableRowValue>)
Loading

0 comments on commit 9cc3650

Please sign in to comment.