Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement converters for airbyte type/value to SQL #49924

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.OffsetDateTime
import java.time.OffsetTime
import java.time.ZoneOffset
import java.time.ZonedDateTime

// TODO Copied from load-iceberg-parquet. This class needs to be extracted
object TimeStringUtility {

fun toLocalDate(dateString: String): LocalDate {
return LocalDate.parse(dateString, TimeStringToInteger.DATE_TIME_FORMATTER)
}

fun toLocalDateTime(dateString: String): LocalDateTime {
return LocalDateTime.parse(dateString, TimeStringToInteger.DATE_TIME_FORMATTER)
}

fun toOffset(timeString: String): LocalTime {
return try {
toMicrosOfDayWithTimezone(timeString)
} catch (e: Exception) {
toMicrosOfDayWithoutTimezone(timeString)
}
}

private fun toMicrosOfDayWithTimezone(timeString: String): LocalTime {
return OffsetTime.parse(timeString, TimeStringToInteger.TIME_FORMATTER).toLocalTime()
}

private fun toMicrosOfDayWithoutTimezone(timeString: String): LocalTime {
return LocalTime.parse(timeString, TimeStringToInteger.TIME_FORMATTER)
}

fun toOffsetDateTime(timestampString: String): OffsetDateTime {
return try {
toOffsetDateTimeWithTimezone(timestampString)
} catch (e: Exception) {
toOffsetDateTimeWithoutTimezone(timestampString)
}
}

private fun toOffsetDateTimeWithTimezone(timestampString: String): OffsetDateTime {
return ZonedDateTime.parse(timestampString, TimeStringToInteger.DATE_TIME_FORMATTER)
.toOffsetDateTime()
}

private fun toOffsetDateTimeWithoutTimezone(timestampString: String): OffsetDateTime {
return LocalDateTime.parse(timestampString, TimeStringToInteger.DATE_TIME_FORMATTER)
.atOffset(ZoneOffset.UTC)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.mssql.v2.convert

import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.ArrayType
import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema
import io.airbyte.cdk.load.data.BooleanType
import io.airbyte.cdk.load.data.DateType
import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.load.data.NumberType
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema
import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.TimeTypeWithTimezone
import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone
import io.airbyte.cdk.load.data.TimestampTypeWithTimezone
import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone
import io.airbyte.cdk.load.data.UnionType
import io.airbyte.cdk.load.data.UnknownType
import io.airbyte.integrations.destination.mssql.v2.model.SqlColumn
import io.airbyte.integrations.destination.mssql.v2.model.SqlTable
import java.sql.Types

/** CDK pipeline [AirbyteType] to SQL [Types] converter. */
class AirbyteTypeToSqlType {

/**
* Converts an [AirbyteType] to the associated SQL [Types] value.
*
* @param airbyteSchema The stream's Airbyte schema, represented as an [AirbyteType]
* @return The associated SQL [Types] value.
* @throws IllegalArgumentException if the [AirbyteType] is not supported.
*/
fun convert(airbyteSchema: AirbyteType): Int {
return when (airbyteSchema) {
is ObjectType -> Types.BLOB
is ArrayType -> Types.BLOB
is ArrayTypeWithoutSchema -> Types.BLOB
is BooleanType -> Types.BOOLEAN
is DateType -> Types.DATE
is IntegerType -> Types.BIGINT
is NumberType -> Types.DECIMAL
is ObjectTypeWithEmptySchema -> Types.BLOB
is ObjectTypeWithoutSchema -> Types.BLOB
is StringType -> Types.VARCHAR
is TimeTypeWithTimezone -> Types.TIME_WITH_TIMEZONE
is TimeTypeWithoutTimezone -> Types.TIME
is TimestampTypeWithTimezone -> Types.TIMESTAMP_WITH_TIMEZONE
is TimestampTypeWithoutTimezone -> Types.TIMESTAMP
is UnionType -> Types.BLOB
is UnknownType -> Types.BLOB
}
}
}

/**
* Extension function that converts an [ObjectType] into a [SqlTable] that can be used to define a
* SQL table.
*
* @param primaryKeys The list of configured primary key properties that should be treated as
* primary keys in the generated [SqlTable]
* @return The [SqlTable] that represents the table to be mapped to the stream represented by the
* [ObjectType].
*/
fun ObjectType.toSqlTable(primaryKeys: List<List<String>>): SqlTable {
val identifierFieldNames = primaryKeys.flatten().toSet()
val sqlTypeConverter = AirbyteTypeToSqlType()
val columns =
this.properties.entries.map { (name, field) ->
val isPrimaryKey = identifierFieldNames.contains(name)
val isNullable = !isPrimaryKey && field.nullable
SqlColumn(
name = name,
type = sqlTypeConverter.convert(field.type),
isPrimaryKey = isPrimaryKey,
isNullable = isNullable
)
}
return SqlTable(columns = columns)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.mssql.v2.convert

import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ArrayValue
import io.airbyte.cdk.load.data.BooleanValue
import io.airbyte.cdk.load.data.DateValue
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.NumberValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.TimeStringUtility.toLocalDate
import io.airbyte.cdk.load.data.TimeStringUtility.toLocalDateTime
import io.airbyte.cdk.load.data.TimeStringUtility.toOffset
import io.airbyte.cdk.load.data.TimeValue
import io.airbyte.cdk.load.data.TimestampValue
import io.airbyte.cdk.load.data.UnknownValue
import io.airbyte.cdk.load.util.serializeToJsonBytes
import java.sql.Date
import java.sql.Time
import java.sql.Timestamp

/** CDK pipeline [AirbyteValue] to SQL values converter. */
class AirbyteValueToSqlValue {

/**
* Converts an [AirbyteValue] to the associated SQL value.
*
* @param airbyteValue The [AirbyteValue] from an Airbyte record
* @return The corresponding SQL value for the given [AirbyteValue].
* @throws IllegalArgumentException if the [AirbyteValue] is not supported.
*/
fun convert(airbyteValue: AirbyteValue): Any? {
return when (airbyteValue) {
is ObjectValue -> {
val convertedValues =
airbyteValue.values.entries.associate { (name, value) ->
name to convert(value)
}
convertedValues
}
is ArrayValue -> airbyteValue.values.map { convert(it) }
is BooleanValue -> airbyteValue.value
is DateValue -> Date.valueOf(toLocalDate(airbyteValue.value))
is IntegerValue -> airbyteValue.value
is NullValue -> null
is NumberValue -> airbyteValue.value.toDouble().toBigDecimal()
is StringValue -> airbyteValue.value
is TimeValue -> Time.valueOf(toOffset(airbyteValue.value))
is TimestampValue -> Timestamp.valueOf(toLocalDateTime(airbyteValue.value))
is UnknownValue -> airbyteValue.value.serializeToJsonBytes()
}
}
}

/**
* Extension function that converts an [ObjectValue] into a row of SQL values.
*
* @param sqlTable The [SqlTable] that contains data type information for each column. This is used
* to filter the [ObjectValue]'s values to only those that exist in the table.
* @return A [SqlTableRow] that contains values converted to their SQL data type equivalents from
* the provided [ObjectValue].
*/
fun ObjectValue.toSqlValue(sqlTable: SqlTable): SqlTableRow {
val converter = AirbyteValueToSqlValue()
return SqlTableRow(
values =
this.values
.filter { (name, _) -> sqlTable.columns.find { it.name == name } != null }
.map { (name, value) ->
val dataType = sqlTable.columns.find { it.name == name }!!.type
val converted =
when (value) {
is ObjectValue ->
(converter.convert(value) as LinkedHashMap<*, *>)
.serializeToJsonBytes()
is ArrayValue ->
(converter.convert(value) as List<*>).serializeToJsonBytes()
else -> converter.convert(value)
}
SqlTableRowValue(name = name, value = converted, type = dataType)
}
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.mssql.v2.model

import java.sql.Types

/**
* Representation of a colum in a SQL table.
*
* @param name The name of the column
* @param type The data type of the column (see [Types] for values).
* @param isPrimaryKey Whether the column represents a primary key.
* @param isNullable Whether the column's value supports null values.
*/
data class SqlColumn(
val name: String,
val type: Int,
val isPrimaryKey: Boolean = false,
val isNullable: Boolean = false
)

/**
* Representation of a SQL table.
*
* @param columns The list of columns in the table.
*/
data class SqlTable(val columns: List<SqlColumn>)

/**
* Representation of a value in a SQL row/column cell.
*
* @param name The name of the column.
* @param value The value of the row/column cell.
* @param type The SQL type of the row/column cell (see [Types] for values).
*/
data class SqlTableRowValue(val name: String, val value: Any?, val type: Int)

/**
* Representation of a row of values in a SQL table.
*
* @param values A list of values stored in the row.
*/
data class SqlTableRow(val values: List<SqlTableRowValue>)
Loading