Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-mysql: normalize class names #49939

Merged
merged 5 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@ plugins {
}

application {
mainClass = 'io.airbyte.integrations.source.mysql.MysqlSource'
mainClass = 'io.airbyte.integrations.source.mysql.MySqlSource'
}

airbyteBulkConnector {
core = 'extract'
toolkits = ['extract-jdbc', 'extract-cdc']
cdk = 'local'
cdk = '0.226'
}

dependencies {
implementation 'com.mysql:mysql-connector-j:9.1.0'
implementation 'org.codehaus.plexus:plexus-utils:4.0.0'
implementation 'io.debezium:debezium-connector-mysql'

testImplementation platform('org.testcontainers:testcontainers-bom:1.20.2')
testImplementation 'org.testcontainers:mysql'
testImplementation("io.mockk:mockk:1.12.0")
}
6 changes: 3 additions & 3 deletions airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
data:
ab_internal:
ql: 200
sl: 0
ql: 400
sl: 300
allowedHosts:
hosts:
- ${host}
- ${tunnel_method.tunnel_host}
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.9.3
dockerImageTag: 3.9.4
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package io.airbyte.integrations.source.mysql

import io.airbyte.cdk.AirbyteSourceRunner

object MysqlSource {
object MySqlSource {
@JvmStatic
fun main(args: Array<String>) {
AirbyteSourceRunner.run(*args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mysql.cdc.converters
package io.airbyte.integrations.source.mysql

import io.debezium.spi.converter.CustomConverter
import io.debezium.spi.converter.RelationalColumn
import java.util.*
import org.apache.kafka.connect.data.SchemaBuilder

class MySQLBooleanConverter : CustomConverter<SchemaBuilder, RelationalColumn> {
class MySqlSourceCdcBooleanConverter : CustomConverter<SchemaBuilder, RelationalColumn> {
override fun configure(props: Properties?) {}

private val BOOLEAN_TYPES = arrayOf("BOOLEAN", "BOOL", "TINYINT")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.read.Stream
import io.airbyte.cdk.util.Jsons

data class MysqlCdcInitialSnapshotStateValue(
data class MySqlSourceCdcInitialSnapshotStateValue(
@JsonProperty("pk_val") val pkVal: String? = null,
@JsonProperty("pk_name") val pkName: String? = null,
@JsonProperty("version") val version: Int? = null,
Expand All @@ -25,7 +25,7 @@ data class MysqlCdcInitialSnapshotStateValue(
/** Value representing the completion of a FULL_REFRESH snapshot. */
fun getSnapshotCompletedState(stream: Stream): OpaqueStateValue =
Jsons.valueToTree(
MysqlCdcInitialSnapshotStateValue(
MySqlSourceCdcInitialSnapshotStateValue(
streamName = stream.name,
cursorField = listOf(),
streamNamespace = stream.namespace
Expand All @@ -42,7 +42,7 @@ data class MysqlCdcInitialSnapshotStateValue(
true -> Jsons.nullNode()
false ->
Jsons.valueToTree(
MysqlCdcInitialSnapshotStateValue(
MySqlSourceCdcInitialSnapshotStateValue(
pkName = primaryKeyField.id,
pkVal = primaryKeyCheckpoint.first().asText(),
stateType = "primary_key",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import io.airbyte.cdk.discover.CdcStringMetaFieldType
import io.airbyte.cdk.discover.FieldType
import io.airbyte.cdk.discover.MetaField

enum class MysqlCdcMetaFields(
enum class MySqlSourceCdcMetaFields(
override val type: FieldType,
) : MetaField {
CDC_CURSOR(CdcIntegerMetaFieldType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mysql.cdc.converters
package io.airbyte.integrations.source.mysql

import io.debezium.spi.converter.CustomConverter
import io.debezium.spi.converter.RelationalColumn
import java.util.*
import org.apache.kafka.connect.data.SchemaBuilder

class MySQLNumericConverter : CustomConverter<SchemaBuilder, RelationalColumn> {
class MySqlSourceCdcNumericConverter : CustomConverter<SchemaBuilder, RelationalColumn> {
override fun configure(props: Properties?) {}

private val NUMERIC_TYPES = arrayOf("FLOAT", "DOUBLE", "DECIMAL")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mysql.cdc
package io.airbyte.integrations.source.mysql

import kotlin.io.path.Path
import kotlin.io.path.extension

/** WAL position datum for MySQL. */
data class MySqlPosition(val fileName: String, val position: Long) : Comparable<MySqlPosition> {
data class MySqlSourceCdcPosition(val fileName: String, val position: Long) :
Comparable<MySqlSourceCdcPosition> {

/**
* Numerical value encoded in the extension of the binlog file name.
Expand All @@ -31,5 +32,6 @@ data class MySqlPosition(val fileName: String, val position: Long) : Comparable<
val cursorValue: Long
get() = (fileExtension.toLong() shl Int.SIZE_BITS) or position

override fun compareTo(other: MySqlPosition): Int = cursorValue.compareTo(other.cursorValue)
override fun compareTo(other: MySqlSourceCdcPosition): Int =
cursorValue.compareTo(other.cursorValue)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mysql.cdc.converters
package io.airbyte.integrations.source.mysql

import io.airbyte.cdk.jdbc.converters.DateTimeConverter
import io.debezium.spi.converter.CustomConverter
Expand All @@ -24,7 +24,7 @@ import org.apache.kafka.connect.data.SchemaBuilder
* MySqlCdcProperties#commonProperties(JdbcDatabase)} (If you don't rename, a test would still fail
* but it might be tricky to figure out where to change the property name)
*/
class MySQLDateTimeConverter : CustomConverter<SchemaBuilder, RelationalColumn> {
class MySqlSourceCdcTemporalConverter : CustomConverter<SchemaBuilder, RelationalColumn> {

private val DATE_TYPES = arrayOf("DATE", "DATETIME", "TIME", "TIMESTAMP")
override fun configure(props: Properties?) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import java.time.Duration

private val log = KotlinLogging.logger {}

/** Mysql-specific implementation of [SourceConfiguration] */
data class MysqlSourceConfiguration(
/** MySQL-specific implementation of [SourceConfiguration] */
data class MySqlSourceConfiguration(
override val realHost: String,
override val realPort: Int,
override val sshTunnel: SshTunnelMethodConfiguration?,
Expand All @@ -41,16 +41,16 @@ data class MysqlSourceConfiguration(
) : JdbcSourceConfiguration, CdcSourceConfiguration {
override val global = incrementalConfiguration is CdcIncrementalConfiguration

/** Required to inject [MysqlSourceConfiguration] directly. */
/** Required to inject [MySqlSourceConfiguration] directly. */
@Factory
private class MicronautFactory {
@Singleton
fun mysqlSourceConfig(
factory:
SourceConfigurationFactory<
MysqlSourceConfigurationSpecification, MysqlSourceConfiguration>,
supplier: ConfigurationSpecificationSupplier<MysqlSourceConfigurationSpecification>,
): MysqlSourceConfiguration = factory.make(supplier.get())
MySqlSourceConfigurationSpecification, MySqlSourceConfiguration>,
supplier: ConfigurationSpecificationSupplier<MySqlSourceConfigurationSpecification>,
): MySqlSourceConfiguration = factory.make(supplier.get())
}
}

Expand All @@ -71,14 +71,14 @@ enum class InvalidCdcCursorPositionBehavior {
}

@Singleton
class MysqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set<FeatureFlag>) :
SourceConfigurationFactory<MysqlSourceConfigurationSpecification, MysqlSourceConfiguration> {
class MySqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set<FeatureFlag>) :
SourceConfigurationFactory<MySqlSourceConfigurationSpecification, MySqlSourceConfiguration> {

constructor() : this(emptySet())

override fun makeWithoutExceptionHandling(
pojo: MysqlSourceConfigurationSpecification,
): MysqlSourceConfiguration {
pojo: MySqlSourceConfigurationSpecification,
): MySqlSourceConfiguration {
val realHost: String = pojo.host
val realPort: Int = pojo.port
val sshTunnel: SshTunnelMethodConfiguration? = pojo.getTunnelMethodValue()
Expand Down Expand Up @@ -115,20 +115,21 @@ class MysqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set<
"SSL encryption or an SSH tunnel."
)
}
MysqlJdbcEncryption(sslMode = SSLMode.PREFERRED)
MySqlSourceEncryption(sslMode = MySqlSourceEncryption.SslMode.PREFERRED)
}
is EncryptionRequired -> MysqlJdbcEncryption(sslMode = SSLMode.REQUIRED)
is EncryptionRequired ->
MySqlSourceEncryption(sslMode = MySqlSourceEncryption.SslMode.REQUIRED)
is SslVerifyCertificate ->
MysqlJdbcEncryption(
sslMode = SSLMode.VERIFY_CA,
MySqlSourceEncryption(
sslMode = MySqlSourceEncryption.SslMode.VERIFY_CA,
caCertificate = encryption.sslCertificate,
clientCertificate = encryption.sslClientCertificate,
clientKey = encryption.sslClientKey,
clientKeyPassword = encryption.sslClientPassword
)
is SslVerifyIdentity ->
MysqlJdbcEncryption(
sslMode = SSLMode.VERIFY_IDENTITY,
MySqlSourceEncryption(
sslMode = MySqlSourceEncryption.SslMode.VERIFY_IDENTITY,
caCertificate = encryption.sslCertificate,
clientCertificate = encryption.sslClientCertificate,
clientKey = encryption.sslClientKey,
Expand Down Expand Up @@ -178,7 +179,7 @@ class MysqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set<
},
)
}
return MysqlSourceConfiguration(
return MySqlSourceConfiguration(
realHost = realHost,
realPort = realPort,
sshTunnel = sshTunnel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ import io.micronaut.context.annotation.ConfigurationProperties
import jakarta.inject.Singleton

/**
* The object which is mapped to the Mysql source configuration JSON.
* The object which is mapped to the MySQL source configuration JSON.
*
* Use [MysqlSourceConfiguration] instead wherever possible. This object also allows injecting
* Use [MySqlSourceConfiguration] instead wherever possible. This object also allows injecting
* values through Micronaut properties, this is made possible by the classes named
* `MicronautPropertiesFriendly.*`.
*/
@JsonSchemaTitle("Mysql Source Spec")
@JsonSchemaTitle("MySQL Source Spec")
@JsonPropertyOrder(
value = ["host", "port", "database", "username", "replication_method"],
)
@Singleton
@ConfigurationProperties(CONNECTOR_CONFIG_PREFIX)
@SuppressFBWarnings(value = ["NP_NONNULL_RETURN_VIOLATION"], justification = "Micronaut DI")
class MysqlSourceConfigurationSpecification : ConfigurationSpecification() {
class MySqlSourceConfigurationSpecification : ConfigurationSpecification() {
@JsonProperty("host")
@JsonSchemaTitle("Host")
@JsonSchemaInject(json = """{"order":1}""")
Expand Down Expand Up @@ -320,7 +320,7 @@ data object UserDefinedCursor : CursorMethodConfiguration
@JsonSchemaTitle("Read Changes using Change Data Capture (CDC)")
@JsonSchemaDescription(
"<i>Recommended</i> - " +
"Incrementally reads new inserts, updates, and deletes using Mysql's <a href=" +
"Incrementally reads new inserts, updates, and deletes using MySQL's <a href=" +
"\"https://docs.airbyte.com/integrations/sources/mssql/#change-data-capture-cdc\"" +
"> change data capture feature</a>. This must be enabled on your database.",
)
Expand Down
Loading
Loading