Skip to content

Commit

Permalink
source-mysql: normalize class names
Browse files Browse the repository at this point in the history
  • Loading branch information
postamar committed Dec 19, 2024
1 parent ebbbb9d commit 420477d
Show file tree
Hide file tree
Showing 28 changed files with 427 additions and 455 deletions.
4 changes: 1 addition & 3 deletions airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

application {
mainClass = 'io.airbyte.integrations.source.mysql.MysqlSource'
mainClass = 'io.airbyte.integrations.source.mysql.MySqlSource'
}

airbyteBulkConnector {
Expand All @@ -14,10 +14,8 @@ airbyteBulkConnector {

dependencies {
implementation 'com.mysql:mysql-connector-j:9.1.0'
implementation 'org.codehaus.plexus:plexus-utils:4.0.0'
implementation 'io.debezium:debezium-connector-mysql'

testImplementation platform('org.testcontainers:testcontainers-bom:1.20.2')
testImplementation 'org.testcontainers:mysql'
testImplementation("io.mockk:mockk:1.12.0")
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package io.airbyte.integrations.source.mysql

import io.airbyte.cdk.AirbyteSourceRunner

object MysqlSource {
object MySqlSource {
@JvmStatic
fun main(args: Array<String>) {
AirbyteSourceRunner.run(*args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mysql.cdc.converters
package io.airbyte.integrations.source.mysql

import io.airbyte.cdk.data.LocalDateCodec
import io.airbyte.cdk.data.LocalDateTimeCodec
Expand All @@ -20,7 +20,52 @@ import java.time.LocalTime
import java.time.ZonedDateTime
import org.apache.kafka.connect.data.SchemaBuilder

class MySqlTemporalConverter : RelationalColumnCustomConverter {
class MySqlSourceCdcBooleanConverter : RelationalColumnCustomConverter {

override val debeziumPropertiesKey: String = "boolean"
override val handlers: List<RelationalColumnCustomConverter.Handler> =
listOf(booleanHandler, tinyint1Handler)

companion object {
val booleanHandler =
RelationalColumnCustomConverter.Handler(
predicate = { it.typeName().startsWith("BOOL", ignoreCase = true) },
outputSchema = SchemaBuilder.bool(),
partialConverters =
listOf(
PartialConverter {
when (it) {
null -> Converted(false)
is Boolean -> Converted(it)
else -> NoConversion
}
}
)
)

val tinyint1Handler =
RelationalColumnCustomConverter.Handler(
predicate = {
it.typeName().equals("TINYINT", ignoreCase = true) &&
it.length().isPresent &&
it.length().asInt == 1
},
outputSchema = SchemaBuilder.bool(),
partialConverters =
listOf(
PartialConverter {
when (it) {
null -> Converted(false)
is Number -> Converted(it != 0)
else -> NoConversion
}
}
)
)
}
}

class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {

override val debeziumPropertiesKey: String = "temporal"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.read.Stream
import io.airbyte.cdk.util.Jsons

data class MysqlCdcInitialSnapshotStateValue(
data class MySqlSourceCdcInitialSnapshotStateValue(
@JsonProperty("pk_val") val pkVal: String? = null,
@JsonProperty("pk_name") val pkName: String? = null,
@JsonProperty("version") val version: Int? = null,
Expand All @@ -25,7 +25,7 @@ data class MysqlCdcInitialSnapshotStateValue(
/** Value representing the completion of a FULL_REFRESH snapshot. */
fun getSnapshotCompletedState(stream: Stream): OpaqueStateValue =
Jsons.valueToTree(
MysqlCdcInitialSnapshotStateValue(
MySqlSourceCdcInitialSnapshotStateValue(
streamName = stream.name,
cursorField = listOf(),
streamNamespace = stream.namespace
Expand All @@ -42,7 +42,7 @@ data class MysqlCdcInitialSnapshotStateValue(
true -> Jsons.nullNode()
false ->
Jsons.valueToTree(
MysqlCdcInitialSnapshotStateValue(
MySqlSourceCdcInitialSnapshotStateValue(
pkName = primaryKeyField.id,
pkVal = primaryKeyCheckpoint.first().asText(),
stateType = "primary_key",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import io.airbyte.cdk.discover.CdcStringMetaFieldType
import io.airbyte.cdk.discover.FieldType
import io.airbyte.cdk.discover.MetaField

enum class MysqlCdcMetaFields(
enum class MySqlSourceCdcMetaFields(
override val type: FieldType,
) : MetaField {
CDC_CURSOR(CdcIntegerMetaFieldType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mysql.cdc
package io.airbyte.integrations.source.mysql

import kotlin.io.path.Path
import kotlin.io.path.extension

/** WAL position datum for MySQL. */
data class MySqlPosition(val fileName: String, val position: Long) : Comparable<MySqlPosition> {
data class MySqlSourceCdcPosition(val fileName: String, val position: Long) :
Comparable<MySqlSourceCdcPosition> {

/**
* Numerical value encoded in the extension of the binlog file name.
Expand All @@ -31,5 +32,6 @@ data class MySqlPosition(val fileName: String, val position: Long) : Comparable<
val cursorValue: Long
get() = (fileExtension.toLong() shl Int.SIZE_BITS) or position

override fun compareTo(other: MySqlPosition): Int = cursorValue.compareTo(other.cursorValue)
override fun compareTo(other: MySqlSourceCdcPosition): Int =
cursorValue.compareTo(other.cursorValue)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import java.time.Duration

private val log = KotlinLogging.logger {}

/** Mysql-specific implementation of [SourceConfiguration] */
data class MysqlSourceConfiguration(
/** MySQL-specific implementation of [SourceConfiguration] */
data class MySqlSourceConfiguration(
override val realHost: String,
override val realPort: Int,
override val sshTunnel: SshTunnelMethodConfiguration?,
Expand All @@ -41,16 +41,16 @@ data class MysqlSourceConfiguration(
) : JdbcSourceConfiguration, CdcSourceConfiguration {
override val global = incrementalConfiguration is CdcIncrementalConfiguration

/** Required to inject [MysqlSourceConfiguration] directly. */
/** Required to inject [MySqlSourceConfiguration] directly. */
@Factory
private class MicronautFactory {
@Singleton
fun mysqlSourceConfig(
factory:
SourceConfigurationFactory<
MysqlSourceConfigurationSpecification, MysqlSourceConfiguration>,
supplier: ConfigurationSpecificationSupplier<MysqlSourceConfigurationSpecification>,
): MysqlSourceConfiguration = factory.make(supplier.get())
MySqlSourceConfigurationSpecification, MySqlSourceConfiguration>,
supplier: ConfigurationSpecificationSupplier<MySqlSourceConfigurationSpecification>,
): MySqlSourceConfiguration = factory.make(supplier.get())
}
}

Expand All @@ -71,14 +71,14 @@ enum class InvalidCdcCursorPositionBehavior {
}

@Singleton
class MysqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set<FeatureFlag>) :
SourceConfigurationFactory<MysqlSourceConfigurationSpecification, MysqlSourceConfiguration> {
class MySqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set<FeatureFlag>) :
SourceConfigurationFactory<MySqlSourceConfigurationSpecification, MySqlSourceConfiguration> {

constructor() : this(emptySet())

override fun makeWithoutExceptionHandling(
pojo: MysqlSourceConfigurationSpecification,
): MysqlSourceConfiguration {
pojo: MySqlSourceConfigurationSpecification,
): MySqlSourceConfiguration {
val realHost: String = pojo.host
val realPort: Int = pojo.port
val sshTunnel: SshTunnelMethodConfiguration? = pojo.getTunnelMethodValue()
Expand Down Expand Up @@ -115,20 +115,21 @@ class MysqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set<
"SSL encryption or an SSH tunnel."
)
}
MysqlJdbcEncryption(sslMode = SSLMode.PREFERRED)
MySqlSourceEncryption(sslMode = MySqlSourceEncryption.SslMode.PREFERRED)
}
is EncryptionRequired -> MysqlJdbcEncryption(sslMode = SSLMode.REQUIRED)
is EncryptionRequired ->
MySqlSourceEncryption(sslMode = MySqlSourceEncryption.SslMode.REQUIRED)
is SslVerifyCertificate ->
MysqlJdbcEncryption(
sslMode = SSLMode.VERIFY_CA,
MySqlSourceEncryption(
sslMode = MySqlSourceEncryption.SslMode.VERIFY_CA,
caCertificate = encryption.sslCertificate,
clientCertificate = encryption.sslClientCertificate,
clientKey = encryption.sslClientKey,
clientKeyPassword = encryption.sslClientPassword
)
is SslVerifyIdentity ->
MysqlJdbcEncryption(
sslMode = SSLMode.VERIFY_IDENTITY,
MySqlSourceEncryption(
sslMode = MySqlSourceEncryption.SslMode.VERIFY_IDENTITY,
caCertificate = encryption.sslCertificate,
clientCertificate = encryption.sslClientCertificate,
clientKey = encryption.sslClientKey,
Expand Down Expand Up @@ -178,7 +179,7 @@ class MysqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set<
},
)
}
return MysqlSourceConfiguration(
return MySqlSourceConfiguration(
realHost = realHost,
realPort = realPort,
sshTunnel = sshTunnel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ import io.micronaut.context.annotation.ConfigurationProperties
import jakarta.inject.Singleton

/**
* The object which is mapped to the Mysql source configuration JSON.
* The object which is mapped to the MySQL source configuration JSON.
*
* Use [MysqlSourceConfiguration] instead wherever possible. This object also allows injecting
* Use [MySqlSourceConfiguration] instead wherever possible. This object also allows injecting
* values through Micronaut properties, this is made possible by the classes named
* `MicronautPropertiesFriendly.*`.
*/
@JsonSchemaTitle("Mysql Source Spec")
@JsonSchemaTitle("MySQL Source Spec")
@JsonPropertyOrder(
value = ["host", "port", "database", "username", "replication_method"],
)
@Singleton
@ConfigurationProperties(CONNECTOR_CONFIG_PREFIX)
@SuppressFBWarnings(value = ["NP_NONNULL_RETURN_VIOLATION"], justification = "Micronaut DI")
class MysqlSourceConfigurationSpecification : ConfigurationSpecification() {
class MySqlSourceConfigurationSpecification : ConfigurationSpecification() {
@JsonProperty("host")
@JsonSchemaTitle("Host")
@JsonSchemaInject(json = """{"order":1}""")
Expand Down Expand Up @@ -320,7 +320,7 @@ data object UserDefinedCursor : CursorMethodConfiguration
@JsonSchemaTitle("Read Changes using Change Data Capture (CDC)")
@JsonSchemaDescription(
"<i>Recommended</i> - " +
"Incrementally reads new inserts, updates, and deletes using Mysql's <a href=" +
"Incrementally reads new inserts, updates, and deletes using MySQL's <a href=" +
"\"https://docs.airbyte.com/integrations/sources/mssql/#change-data-capture-cdc\"" +
"> change data capture feature</a>. This must be enabled on your database.",
)
Expand Down
Loading

0 comments on commit 420477d

Please sign in to comment.