From 420477d9c46235db5c7ab54123404a47d6b38af6 Mon Sep 17 00:00:00 2001 From: Marius Posta Date: Thu, 19 Dec 2024 08:58:30 -0500 Subject: [PATCH] source-mysql: normalize class names --- .../connectors/source-mysql/build.gradle | 4 +- .../mysql/{MysqlSource.kt => MySqlSource.kt} | 2 +- ...r.kt => MySqlSourceCdcCustomConverters.kt} | 49 ++++++- ...ySqlSourceCdcInitialSnapshotStateValue.kt} | 6 +- ...aFields.kt => MySqlSourceCdcMetaFields.kt} | 2 +- ...lPosition.kt => MySqlSourceCdcPosition.kt} | 8 +- ...uration.kt => MySqlSourceConfiguration.kt} | 35 ++--- ... MySqlSourceConfigurationSpecification.kt} | 10 +- ...ns.kt => MySqlSourceDebeziumOperations.kt} | 76 +++++----- ...Encryption.kt => MySqlSourceEncryption.kt} | 138 +++++++++--------- ...rtition.kt => MySqlSourceJdbcPartition.kt} | 62 ++++---- ....kt => MySqlSourceJdbcPartitionFactory.kt} | 55 +++---- ....kt => MySqlSourceJdbcStreamStateValue.kt} | 29 ++-- ...erier.kt => MySqlSourceMetadataQuerier.kt} | 16 +- ...Operations.kt => MySqlSourceOperations.kt} | 23 ++- ...Querier.kt => MySqlSourceSelectQuerier.kt} | 8 +- .../cdc/converters/MySqlBooleanConverter.kt | 56 ------- ...nerFactory.kt => MySqlContainerFactory.kt} | 22 ++- ...st.kt => MySqlSourceCdcIntegrationTest.kt} | 28 ++-- ...qlSourceConfigurationSpecificationTest.kt} | 15 +- ...est.kt => MySqlSourceConfigurationTest.kt} | 46 ++---- ... MySqlSourceCursorBasedIntegrationTest.kt} | 12 +- ... => MySqlSourceDatatypeIntegrationTest.kt} | 102 ++++++------- ...=> MySqlSourceJdbcPartitionFactoryTest.kt} | 58 ++++---- ...=> MySqlSourceSelectQueryGeneratorTest.kt} | 4 +- ...t.kt => MySqlSourceSpecIntegrationTest.kt} | 2 +- ...=> MySqlSourceTestConfigurationFactory.kt} | 10 +- .../src/test/resources/expected-spec.json | 4 +- 28 files changed, 427 insertions(+), 455 deletions(-) rename airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/{MysqlSource.kt => MySqlSource.kt} (92%) rename airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/{cdc/converters/MySqlTemporalConverter.kt => MySqlSourceCdcCustomConverters.kt} (66%) rename airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/{MysqlCdcInitialSnapshotStateValue.kt => MySqlSourceCdcInitialSnapshotStateValue.kt} (91%) rename airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/{MysqlCdcMetaFields.kt => MySqlSourceCdcMetaFields.kt} (93%) rename airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/{cdc/MySqlPosition.kt => MySqlSourceCdcPosition.kt} (80%) rename airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/{MysqlSourceConfiguration.kt => MySqlSourceConfiguration.kt} (87%) rename airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/{MysqlSourceConfigurationSpecification.kt => MySqlSourceConfigurationSpecification.kt} (98%) rename airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/{cdc/MySqlDebeziumOperations.kt => MySqlSourceDebeziumOperations.kt} (90%) rename airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/{MysqlJdbcEncryption.kt => MySqlSourceEncryption.kt} (89%) rename airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/{MysqlJdbcPartition.kt => MySqlSourceJdbcPartition.kt} (85%) rename airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/{MysqlJdbcPartitionFactory.kt => MySqlSourceJdbcPartitionFactory.kt} (90%) rename airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/{MysqlJdbcStreamStateValue.kt => MySqlSourceJdbcStreamStateValue.kt} (84%) rename airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/{MysqlSourceMetadataQuerier.kt => MySqlSourceMetadataQuerier.kt} (95%) rename airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/{MysqlSourceOperations.kt => MySqlSourceOperations.kt} (93%) rename airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/{MysqlSelectQuerier.kt => MySqlSourceSelectQuerier.kt} (90%) delete mode 100644 airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/converters/MySqlBooleanConverter.kt rename airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/{MysqlContainerFactory.kt => MySqlContainerFactory.kt} (83%) rename airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/{MysqlCdcIntegrationTest.kt => MySqlSourceCdcIntegrationTest.kt} (88%) rename airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/{MysqlSourceConfigurationSpecificationTest.kt => MySqlSourceConfigurationSpecificationTest.kt} (90%) rename airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/{MysqlSourceConfigurationTest.kt => MySqlSourceConfigurationTest.kt} (81%) rename airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/{MysqlCursorBasedIntegrationTest.kt => MySqlSourceCursorBasedIntegrationTest.kt} (96%) rename airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/{MySqlDatatypeIntegrationTest.kt => MySqlSourceDatatypeIntegrationTest.kt} (78%) rename airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/{MysqlJdbcPartitionFactoryTest.kt => MySqlSourceJdbcPartitionFactoryTest.kt} (83%) rename airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/{MysqlSourceSelectQueryGeneratorTest.kt => MySqlSourceSelectQueryGeneratorTest.kt} (97%) rename airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/{MysqlSpecIntegrationTest.kt => MySqlSourceSpecIntegrationTest.kt} (87%) rename airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/{MysqlSourceTestConfigurationFactory.kt => MySqlSourceTestConfigurationFactory.kt} (73%) diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index 8bf6b20e09da..7313a068944e 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -3,7 +3,7 @@ plugins { } application { - mainClass = 'io.airbyte.integrations.source.mysql.MysqlSource' + mainClass = 'io.airbyte.integrations.source.mysql.MySqlSource' } airbyteBulkConnector { @@ -14,10 +14,8 @@ airbyteBulkConnector { dependencies { implementation 'com.mysql:mysql-connector-j:9.1.0' - implementation 'org.codehaus.plexus:plexus-utils:4.0.0' implementation 'io.debezium:debezium-connector-mysql' - testImplementation platform('org.testcontainers:testcontainers-bom:1.20.2') testImplementation 'org.testcontainers:mysql' testImplementation("io.mockk:mockk:1.12.0") } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSource.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSource.kt similarity index 92% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSource.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSource.kt index c46a2c053471..29caa56c7aaf 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSource.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSource.kt @@ -3,7 +3,7 @@ package io.airbyte.integrations.source.mysql import io.airbyte.cdk.AirbyteSourceRunner -object MysqlSource { +object MySqlSource { @JvmStatic fun main(args: Array) { AirbyteSourceRunner.run(*args) diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/converters/MySqlTemporalConverter.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcCustomConverters.kt similarity index 66% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/converters/MySqlTemporalConverter.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcCustomConverters.kt index 87fa7127e18b..b4eb7259a448 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/converters/MySqlTemporalConverter.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcCustomConverters.kt @@ -2,7 +2,7 @@ * Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql.cdc.converters +package io.airbyte.integrations.source.mysql import io.airbyte.cdk.data.LocalDateCodec import io.airbyte.cdk.data.LocalDateTimeCodec @@ -20,7 +20,52 @@ import java.time.LocalTime import java.time.ZonedDateTime import org.apache.kafka.connect.data.SchemaBuilder -class MySqlTemporalConverter : RelationalColumnCustomConverter { +class MySqlSourceCdcBooleanConverter : RelationalColumnCustomConverter { + + override val debeziumPropertiesKey: String = "boolean" + override val handlers: List = + listOf(booleanHandler, tinyint1Handler) + + companion object { + val booleanHandler = + RelationalColumnCustomConverter.Handler( + predicate = { it.typeName().startsWith("BOOL", ignoreCase = true) }, + outputSchema = SchemaBuilder.bool(), + partialConverters = + listOf( + PartialConverter { + when (it) { + null -> Converted(false) + is Boolean -> Converted(it) + else -> NoConversion + } + } + ) + ) + + val tinyint1Handler = + RelationalColumnCustomConverter.Handler( + predicate = { + it.typeName().equals("TINYINT", ignoreCase = true) && + it.length().isPresent && + it.length().asInt == 1 + }, + outputSchema = SchemaBuilder.bool(), + partialConverters = + listOf( + PartialConverter { + when (it) { + null -> Converted(false) + is Number -> Converted(it != 0) + else -> NoConversion + } + } + ) + ) + } +} + +class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter { override val debeziumPropertiesKey: String = "temporal" diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcInitialSnapshotStateValue.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcInitialSnapshotStateValue.kt similarity index 91% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcInitialSnapshotStateValue.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcInitialSnapshotStateValue.kt index 55f3c10a8db4..621966c080a5 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcInitialSnapshotStateValue.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcInitialSnapshotStateValue.kt @@ -11,7 +11,7 @@ import io.airbyte.cdk.discover.Field import io.airbyte.cdk.read.Stream import io.airbyte.cdk.util.Jsons -data class MysqlCdcInitialSnapshotStateValue( +data class MySqlSourceCdcInitialSnapshotStateValue( @JsonProperty("pk_val") val pkVal: String? = null, @JsonProperty("pk_name") val pkName: String? = null, @JsonProperty("version") val version: Int? = null, @@ -25,7 +25,7 @@ data class MysqlCdcInitialSnapshotStateValue( /** Value representing the completion of a FULL_REFRESH snapshot. */ fun getSnapshotCompletedState(stream: Stream): OpaqueStateValue = Jsons.valueToTree( - MysqlCdcInitialSnapshotStateValue( + MySqlSourceCdcInitialSnapshotStateValue( streamName = stream.name, cursorField = listOf(), streamNamespace = stream.namespace @@ -42,7 +42,7 @@ data class MysqlCdcInitialSnapshotStateValue( true -> Jsons.nullNode() false -> Jsons.valueToTree( - MysqlCdcInitialSnapshotStateValue( + MySqlSourceCdcInitialSnapshotStateValue( pkName = primaryKeyField.id, pkVal = primaryKeyCheckpoint.first().asText(), stateType = "primary_key", diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcMetaFields.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcMetaFields.kt similarity index 93% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcMetaFields.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcMetaFields.kt index d8f4ad6de14a..c079bb0c0202 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcMetaFields.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcMetaFields.kt @@ -9,7 +9,7 @@ import io.airbyte.cdk.discover.CdcStringMetaFieldType import io.airbyte.cdk.discover.FieldType import io.airbyte.cdk.discover.MetaField -enum class MysqlCdcMetaFields( +enum class MySqlSourceCdcMetaFields( override val type: FieldType, ) : MetaField { CDC_CURSOR(CdcIntegerMetaFieldType), diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlPosition.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcPosition.kt similarity index 80% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlPosition.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcPosition.kt index bcb7abefbcfd..13598aba4b89 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlPosition.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcPosition.kt @@ -2,13 +2,14 @@ * Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql.cdc +package io.airbyte.integrations.source.mysql import kotlin.io.path.Path import kotlin.io.path.extension /** WAL position datum for MySQL. */ -data class MySqlPosition(val fileName: String, val position: Long) : Comparable { +data class MySqlSourceCdcPosition(val fileName: String, val position: Long) : + Comparable { /** * Numerical value encoded in the extension of the binlog file name. @@ -31,5 +32,6 @@ data class MySqlPosition(val fileName: String, val position: Long) : Comparable< val cursorValue: Long get() = (fileExtension.toLong() shl Int.SIZE_BITS) or position - override fun compareTo(other: MySqlPosition): Int = cursorValue.compareTo(other.cursorValue) + override fun compareTo(other: MySqlSourceCdcPosition): Int = + cursorValue.compareTo(other.cursorValue) } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfiguration.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfiguration.kt similarity index 87% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfiguration.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfiguration.kt index b5c50a5b237d..7e342d020925 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfiguration.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfiguration.kt @@ -21,8 +21,8 @@ import java.time.Duration private val log = KotlinLogging.logger {} -/** Mysql-specific implementation of [SourceConfiguration] */ -data class MysqlSourceConfiguration( +/** MySQL-specific implementation of [SourceConfiguration] */ +data class MySqlSourceConfiguration( override val realHost: String, override val realPort: Int, override val sshTunnel: SshTunnelMethodConfiguration?, @@ -41,16 +41,16 @@ data class MysqlSourceConfiguration( ) : JdbcSourceConfiguration, CdcSourceConfiguration { override val global = incrementalConfiguration is CdcIncrementalConfiguration - /** Required to inject [MysqlSourceConfiguration] directly. */ + /** Required to inject [MySqlSourceConfiguration] directly. */ @Factory private class MicronautFactory { @Singleton fun mysqlSourceConfig( factory: SourceConfigurationFactory< - MysqlSourceConfigurationSpecification, MysqlSourceConfiguration>, - supplier: ConfigurationSpecificationSupplier, - ): MysqlSourceConfiguration = factory.make(supplier.get()) + MySqlSourceConfigurationSpecification, MySqlSourceConfiguration>, + supplier: ConfigurationSpecificationSupplier, + ): MySqlSourceConfiguration = factory.make(supplier.get()) } } @@ -71,14 +71,14 @@ enum class InvalidCdcCursorPositionBehavior { } @Singleton -class MysqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set) : - SourceConfigurationFactory { +class MySqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set) : + SourceConfigurationFactory { constructor() : this(emptySet()) override fun makeWithoutExceptionHandling( - pojo: MysqlSourceConfigurationSpecification, - ): MysqlSourceConfiguration { + pojo: MySqlSourceConfigurationSpecification, + ): MySqlSourceConfiguration { val realHost: String = pojo.host val realPort: Int = pojo.port val sshTunnel: SshTunnelMethodConfiguration? = pojo.getTunnelMethodValue() @@ -115,20 +115,21 @@ class MysqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set< "SSL encryption or an SSH tunnel." ) } - MysqlJdbcEncryption(sslMode = SSLMode.PREFERRED) + MySqlSourceEncryption(sslMode = MySqlSourceEncryption.SslMode.PREFERRED) } - is EncryptionRequired -> MysqlJdbcEncryption(sslMode = SSLMode.REQUIRED) + is EncryptionRequired -> + MySqlSourceEncryption(sslMode = MySqlSourceEncryption.SslMode.REQUIRED) is SslVerifyCertificate -> - MysqlJdbcEncryption( - sslMode = SSLMode.VERIFY_CA, + MySqlSourceEncryption( + sslMode = MySqlSourceEncryption.SslMode.VERIFY_CA, caCertificate = encryption.sslCertificate, clientCertificate = encryption.sslClientCertificate, clientKey = encryption.sslClientKey, clientKeyPassword = encryption.sslClientPassword ) is SslVerifyIdentity -> - MysqlJdbcEncryption( - sslMode = SSLMode.VERIFY_IDENTITY, + MySqlSourceEncryption( + sslMode = MySqlSourceEncryption.SslMode.VERIFY_IDENTITY, caCertificate = encryption.sslCertificate, clientCertificate = encryption.sslClientCertificate, clientKey = encryption.sslClientKey, @@ -178,7 +179,7 @@ class MysqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set< }, ) } - return MysqlSourceConfiguration( + return MySqlSourceConfiguration( realHost = realHost, realPort = realPort, sshTunnel = sshTunnel, diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfigurationSpecification.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfigurationSpecification.kt similarity index 98% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfigurationSpecification.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfigurationSpecification.kt index f2741de20053..1c20cd6de425 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfigurationSpecification.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfigurationSpecification.kt @@ -27,20 +27,20 @@ import io.micronaut.context.annotation.ConfigurationProperties import jakarta.inject.Singleton /** - * The object which is mapped to the Mysql source configuration JSON. + * The object which is mapped to the MySQL source configuration JSON. * - * Use [MysqlSourceConfiguration] instead wherever possible. This object also allows injecting + * Use [MySqlSourceConfiguration] instead wherever possible. This object also allows injecting * values through Micronaut properties, this is made possible by the classes named * `MicronautPropertiesFriendly.*`. */ -@JsonSchemaTitle("Mysql Source Spec") +@JsonSchemaTitle("MySQL Source Spec") @JsonPropertyOrder( value = ["host", "port", "database", "username", "replication_method"], ) @Singleton @ConfigurationProperties(CONNECTOR_CONFIG_PREFIX) @SuppressFBWarnings(value = ["NP_NONNULL_RETURN_VIOLATION"], justification = "Micronaut DI") -class MysqlSourceConfigurationSpecification : ConfigurationSpecification() { +class MySqlSourceConfigurationSpecification : ConfigurationSpecification() { @JsonProperty("host") @JsonSchemaTitle("Host") @JsonSchemaInject(json = """{"order":1}""") @@ -320,7 +320,7 @@ data object UserDefinedCursor : CursorMethodConfiguration @JsonSchemaTitle("Read Changes using Change Data Capture (CDC)") @JsonSchemaDescription( "Recommended - " + - "Incrementally reads new inserts, updates, and deletes using Mysql's change data capture feature. This must be enabled on your database.", ) diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumOperations.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDebeziumOperations.kt similarity index 90% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumOperations.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDebeziumOperations.kt index b5cb1920ba73..3256101a0a24 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumOperations.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDebeziumOperations.kt @@ -2,7 +2,7 @@ * Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql.cdc +package io.airbyte.integrations.source.mysql import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.node.ArrayNode @@ -32,12 +32,6 @@ import io.airbyte.cdk.read.cdc.DebeziumState import io.airbyte.cdk.read.cdc.DeserializedRecord import io.airbyte.cdk.ssh.TunnelSession import io.airbyte.cdk.util.Jsons -import io.airbyte.integrations.source.mysql.CdcIncrementalConfiguration -import io.airbyte.integrations.source.mysql.InvalidCdcCursorPositionBehavior -import io.airbyte.integrations.source.mysql.MysqlCdcMetaFields -import io.airbyte.integrations.source.mysql.MysqlSourceConfiguration -import io.airbyte.integrations.source.mysql.cdc.converters.MySqlBooleanConverter -import io.airbyte.integrations.source.mysql.cdc.converters.MySqlTemporalConverter import io.debezium.connector.mysql.MySqlConnector import io.debezium.connector.mysql.gtid.MySqlGtidSet import io.debezium.document.DocumentReader @@ -63,11 +57,11 @@ import org.apache.kafka.connect.source.SourceRecord import org.apache.mina.util.Base64 @Singleton -class MySqlDebeziumOperations( +class MySqlSourceDebeziumOperations( val jdbcConnectionFactory: JdbcConnectionFactory, - val configuration: MysqlSourceConfiguration, + val configuration: MySqlSourceConfiguration, random: Random = Random.Default, -) : DebeziumOperations { +) : DebeziumOperations { private val log = KotlinLogging.logger {} override fun deserialize( @@ -112,11 +106,20 @@ class MySqlDebeziumOperations( if (isDelete) transactionTimestampJsonNode else Jsons.nullNode(), ) // Set _ab_cdc_log_file and _ab_cdc_log_pos meta-field values. - val position = MySqlPosition(source["file"].asText(), source["pos"].asLong()) - data.set(MysqlCdcMetaFields.CDC_LOG_FILE.id, TextCodec.encode(position.fileName)) - data.set(MysqlCdcMetaFields.CDC_LOG_POS.id, LongCodec.encode(position.position)) + val position = MySqlSourceCdcPosition(source["file"].asText(), source["pos"].asLong()) + data.set( + MySqlSourceCdcMetaFields.CDC_LOG_FILE.id, + TextCodec.encode(position.fileName) + ) + data.set( + MySqlSourceCdcMetaFields.CDC_LOG_POS.id, + LongCodec.encode(position.position) + ) // Set the _ab_cdc_cursor meta-field value. - data.set(MysqlCdcMetaFields.CDC_CURSOR.id, LongCodec.encode(position.cursorValue)) + data.set( + MySqlSourceCdcMetaFields.CDC_CURSOR.id, + LongCodec.encode(position.cursorValue) + ) // Return a DeserializedRecord instance. return DeserializedRecord(data, changes = emptyMap()) } @@ -135,7 +138,7 @@ class MySqlDebeziumOperations( */ private fun validate(debeziumState: DebeziumState): CdcStateValidateResult { val savedStateOffset: SavedOffset = parseSavedOffset(debeziumState) - val (_: MySqlPosition, gtidSet: String?) = queryPositionAndGtids() + val (_: MySqlSourceCdcPosition, gtidSet: String?) = queryPositionAndGtids() if (gtidSet.isNullOrEmpty() && !savedStateOffset.gtidSet.isNullOrEmpty()) { log.info { "Connector used GTIDs previously, but MySQL server does not know of any GTIDs or they are not enabled" @@ -201,12 +204,12 @@ class MySqlDebeziumOperations( } private fun parseSavedOffset(debeziumState: DebeziumState): SavedOffset { - val position: MySqlPosition = position(debeziumState.offset) + val position: MySqlSourceCdcPosition = position(debeziumState.offset) val gtidSet: String? = debeziumState.offset.wrapped.values.first()["gtids"]?.asText() return SavedOffset(position, gtidSet) } - data class SavedOffset(val position: MySqlPosition, val gtidSet: String?) + data class SavedOffset(val position: MySqlSourceCdcPosition, val gtidSet: String?) enum class CdcStateValidateResult { VALID, @@ -214,23 +217,25 @@ class MySqlDebeziumOperations( INVALID_RESET } - override fun position(offset: DebeziumOffset): MySqlPosition = Companion.position(offset) + override fun position(offset: DebeziumOffset): MySqlSourceCdcPosition = + Companion.position(offset) - override fun position(recordValue: DebeziumRecordValue): MySqlPosition? { + override fun position(recordValue: DebeziumRecordValue): MySqlSourceCdcPosition? { val file: JsonNode = recordValue.source["file"]?.takeIf { it.isTextual } ?: return null val pos: JsonNode = recordValue.source["pos"]?.takeIf { it.isIntegralNumber } ?: return null - return MySqlPosition(file.asText(), pos.asLong()) + return MySqlSourceCdcPosition(file.asText(), pos.asLong()) } - override fun position(sourceRecord: SourceRecord): MySqlPosition? { + override fun position(sourceRecord: SourceRecord): MySqlSourceCdcPosition? { val offset: Map = sourceRecord.sourceOffset() val file: Any = offset["file"] ?: return null val pos: Long = offset["pos"] as? Long ?: return null - return MySqlPosition(file.toString(), pos) + return MySqlSourceCdcPosition(file.toString(), pos) } override fun synthesize(): DebeziumInput { - val (mySqlPosition: MySqlPosition, gtidSet: String?) = queryPositionAndGtids() + val (mySqlSourceCdcPosition: MySqlSourceCdcPosition, gtidSet: String?) = + queryPositionAndGtids() val topicPrefixName: String = DebeziumPropertiesBuilder.sanitizeTopicPrefix(databaseName) val timestamp: Instant = Instant.now() val key: ArrayNode = @@ -241,8 +246,8 @@ class MySqlDebeziumOperations( val value: ObjectNode = Jsons.objectNode().apply { put("ts_sec", timestamp.epochSecond) - put("file", mySqlPosition.fileName) - put("pos", mySqlPosition.position) + put("file", mySqlSourceCdcPosition.fileName) + put("pos", mySqlSourceCdcPosition.position) if (gtidSet != null) { put("gtids", gtidSet) } @@ -253,7 +258,7 @@ class MySqlDebeziumOperations( return DebeziumInput(syntheticProperties, state, isSynthetic = true) } - private fun queryPositionAndGtids(): Pair { + private fun queryPositionAndGtids(): Pair { val file = Field("File", StringFieldType) val pos = Field("Position", LongFieldType) val gtids = Field("Executed_Gtid_Set", StringFieldType) @@ -262,8 +267,8 @@ class MySqlDebeziumOperations( val sql = "SHOW MASTER STATUS" stmt.executeQuery(sql).use { rs: ResultSet -> if (!rs.next()) throw ConfigErrorException("No results for query: $sql") - val mySqlPosition = - MySqlPosition( + val mySqlSourceCdcPosition = + MySqlSourceCdcPosition( fileName = rs.getString(file.id)?.takeUnless { rs.wasNull() } ?: throw ConfigErrorException( "No value for ${file.id} in: $sql", @@ -275,7 +280,7 @@ class MySqlDebeziumOperations( ) if (rs.metaData.columnCount <= 4) { // This value exists only in MySQL 5.6.5 or later. - return mySqlPosition to null + return mySqlSourceCdcPosition to null } val gtidSet: String? = rs.getString(gtids.id) @@ -283,7 +288,7 @@ class MySqlDebeziumOperations( ?.trim() ?.replace("\n", "") ?.replace("\r", "") - return mySqlPosition to gtidSet + return mySqlSourceCdcPosition to gtidSet } } } @@ -303,7 +308,7 @@ class MySqlDebeziumOperations( } private fun getBinaryLogFileNames(): List { - // Very old Mysql version (4.x) has different output of SHOW BINARY LOGS output. + // Very old MySQL version (4.x) has different output of SHOW BINARY LOGS output. return jdbcConnectionFactory.get().use { connection: Connection -> connection.createStatement().use { stmt: Statement -> val sql = "SHOW BINARY LOGS" @@ -417,7 +422,10 @@ class MySqlDebeziumOperations( .withDatabase("include.list", databaseName) .withOffset() .withSchemaHistory() - .withConverters(MySqlBooleanConverter::class, MySqlTemporalConverter::class) + .withConverters( + MySqlSourceCdcBooleanConverter::class, + MySqlSourceCdcTemporalConverter::class + ) val serverTimezone: String? = (configuration.incrementalConfiguration as CdcIncrementalConfiguration).serverTimezone @@ -499,12 +507,12 @@ class MySqlDebeziumOperations( return DebeziumState(offset, DebeziumSchemaHistory(schemaHistoryList)) } - internal fun position(offset: DebeziumOffset): MySqlPosition { + internal fun position(offset: DebeziumOffset): MySqlSourceCdcPosition { if (offset.wrapped.size != 1) { throw ConfigErrorException("Expected exactly 1 key in $offset") } val offsetValue: ObjectNode = offset.wrapped.values.first() as ObjectNode - return MySqlPosition(offsetValue["file"].asText(), offsetValue["pos"].asLong()) + return MySqlSourceCdcPosition(offsetValue["file"].asText(), offsetValue["pos"].asLong()) } } } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcEncryption.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceEncryption.kt similarity index 89% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcEncryption.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceEncryption.kt index f7bf5a456144..c77ece2130da 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcEncryption.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceEncryption.kt @@ -5,7 +5,6 @@ package io.airbyte.integrations.source.mysql import io.airbyte.cdk.ConfigErrorException -import io.airbyte.cdk.SystemErrorException import io.airbyte.cdk.jdbc.SSLCertificateUtils import io.github.oshai.kotlinlogging.KotlinLogging import java.net.MalformedURLException @@ -15,22 +14,68 @@ import java.util.UUID private val log = KotlinLogging.logger {} -class MysqlJdbcEncryption( - val sslMode: SSLMode = SSLMode.PREFERRED, +class MySqlSourceEncryption( + val sslMode: SslMode = SslMode.PREFERRED, val caCertificate: String? = null, val clientCertificate: String? = null, val clientKey: String? = null, val clientKeyPassword: String? = null, ) { - companion object { - const val TRUST_KEY_STORE_URL: String = "trustCertificateKeyStoreUrl" - const val TRUST_KEY_STORE_PASS: String = "trustCertificateKeyStorePassword" - const val CLIENT_KEY_STORE_URL: String = "clientCertificateKeyStoreUrl" - const val CLIENT_KEY_STORE_PASS: String = "clientCertificateKeyStorePassword" - const val CLIENT_KEY_STORE_TYPE: String = "clientCertificateKeyStoreType" - const val TRUST_KEY_STORE_TYPE: String = "trustCertificateKeyStoreType" - const val KEY_STORE_TYPE_PKCS12: String = "PKCS12" - const val SSL_MODE: String = "sslMode" + + /** + * Enum representing the SSL mode for MySQL connections. The actual jdbc property name is the + * lower case of the enum name. + */ + enum class SslMode { + PREFERRED, + REQUIRED, + VERIFY_CA, + VERIFY_IDENTITY, + } + + fun parseSSLConfig(): Map { + var caCertKeyStorePair: Pair? + var clientCertKeyStorePair: Pair? + val additionalParameters: MutableMap = mutableMapOf() + + additionalParameters[SSL_MODE] = sslMode.name.lowercase() + + caCertKeyStorePair = prepareCACertificateKeyStore() + + if (null != caCertKeyStorePair) { + log.debug { "uri for ca cert keystore: ${caCertKeyStorePair.first}" } + try { + additionalParameters.putAll( + mapOf( + TRUST_KEY_STORE_URL to caCertKeyStorePair.first.toURL().toString(), + TRUST_KEY_STORE_PASS to caCertKeyStorePair.second, + TRUST_KEY_STORE_TYPE to KEY_STORE_TYPE_PKCS12 + ) + ) + } catch (e: MalformedURLException) { + throw ConfigErrorException("Unable to get a URL for trust key store") + } + + clientCertKeyStorePair = prepareClientCertificateKeyStore() + + if (null != clientCertKeyStorePair) { + log.debug { + "uri for client cert keystore: ${clientCertKeyStorePair.first} / ${clientCertKeyStorePair.second}" + } + try { + additionalParameters.putAll( + mapOf( + CLIENT_KEY_STORE_URL to clientCertKeyStorePair.first.toURL().toString(), + CLIENT_KEY_STORE_PASS to clientCertKeyStorePair.second, + CLIENT_KEY_STORE_TYPE to KEY_STORE_TYPE_PKCS12 + ) + ) + } catch (e: MalformedURLException) { + throw ConfigErrorException("Unable to get a URL for client key store") + } + } + } + return additionalParameters } private fun getOrGeneratePassword(): String { @@ -85,67 +130,14 @@ class MysqlJdbcEncryption( return clientCertKeyStorePair } - fun parseSSLConfig(): Map { - var caCertKeyStorePair: Pair? - var clientCertKeyStorePair: Pair? - val additionalParameters: MutableMap = mutableMapOf() - - additionalParameters[SSL_MODE] = sslMode.name.lowercase() - - caCertKeyStorePair = prepareCACertificateKeyStore() - - if (null != caCertKeyStorePair) { - log.debug { "uri for ca cert keystore: ${caCertKeyStorePair.first}" } - try { - additionalParameters.putAll( - mapOf( - TRUST_KEY_STORE_URL to caCertKeyStorePair.first.toURL().toString(), - TRUST_KEY_STORE_PASS to caCertKeyStorePair.second, - TRUST_KEY_STORE_TYPE to KEY_STORE_TYPE_PKCS12 - ) - ) - } catch (e: MalformedURLException) { - throw ConfigErrorException("Unable to get a URL for trust key store") - } - - clientCertKeyStorePair = prepareClientCertificateKeyStore() - - if (null != clientCertKeyStorePair) { - log.debug { - "uri for client cert keystore: ${clientCertKeyStorePair.first} / ${clientCertKeyStorePair.second}" - } - try { - additionalParameters.putAll( - mapOf( - CLIENT_KEY_STORE_URL to clientCertKeyStorePair.first.toURL().toString(), - CLIENT_KEY_STORE_PASS to clientCertKeyStorePair.second, - CLIENT_KEY_STORE_TYPE to KEY_STORE_TYPE_PKCS12 - ) - ) - } catch (e: MalformedURLException) { - throw ConfigErrorException("Unable to get a URL for client key store") - } - } - } - return additionalParameters - } -} - -/** - * Enum representing the SSL mode for MySQL connections. The actual jdbc property name is the lower - * case of the enum name. - */ -enum class SSLMode() { - PREFERRED, - REQUIRED, - VERIFY_CA, - VERIFY_IDENTITY; - companion object { - - fun fromJdbcPropertyName(jdbcPropertyName: String): SSLMode { - return SSLMode.values().find { it.name.equals(jdbcPropertyName, ignoreCase = true) } - ?: throw SystemErrorException("Unknown SSL mode: $jdbcPropertyName") - } + const val TRUST_KEY_STORE_URL: String = "trustCertificateKeyStoreUrl" + const val TRUST_KEY_STORE_PASS: String = "trustCertificateKeyStorePassword" + const val CLIENT_KEY_STORE_URL: String = "clientCertificateKeyStoreUrl" + const val CLIENT_KEY_STORE_PASS: String = "clientCertificateKeyStorePassword" + const val CLIENT_KEY_STORE_TYPE: String = "clientCertificateKeyStoreType" + const val TRUST_KEY_STORE_TYPE: String = "trustCertificateKeyStoreType" + const val KEY_STORE_TYPE_PKCS12: String = "PKCS12" + const val SSL_MODE: String = "sslMode" } } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartition.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartition.kt similarity index 85% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartition.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartition.kt index 2e9191cc37c4..ecb94853c552 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartition.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartition.kt @@ -37,7 +37,7 @@ import io.airbyte.cdk.read.optimize import io.airbyte.cdk.util.Jsons /** Base class for default implementations of [JdbcPartition] for non resumable partitions. */ -sealed class MysqlJdbcPartition( +sealed class MySqlSourceJdbcPartition( val selectQueryGenerator: SelectQueryGenerator, streamState: DefaultJdbcStreamState, ) : JdbcPartition { @@ -62,29 +62,29 @@ sealed class MysqlJdbcPartition( } /** Default implementation of a [JdbcPartition] for an unsplittable snapshot partition. */ -class MysqlJdbcNonResumableSnapshotPartition( +class MySqlSourceJdbcNonResumableSnapshotPartition( selectQueryGenerator: SelectQueryGenerator, override val streamState: DefaultJdbcStreamState, -) : MysqlJdbcPartition(selectQueryGenerator, streamState) { +) : MySqlSourceJdbcPartition(selectQueryGenerator, streamState) { - override val completeState: OpaqueStateValue = MysqlJdbcStreamStateValue.snapshotCompleted + override val completeState: OpaqueStateValue = MySqlSourceJdbcStreamStateValue.snapshotCompleted } /** * Default implementation of a [JdbcPartition] for an non resumable snapshot partition preceding a * cursor-based incremental sync. */ -class MysqlJdbcNonResumableSnapshotWithCursorPartition( +class MySqlSourceJdbcNonResumableSnapshotWithCursorPartition( selectQueryGenerator: SelectQueryGenerator, override val streamState: DefaultJdbcStreamState, val cursor: Field, ) : - MysqlJdbcPartition(selectQueryGenerator, streamState), + MySqlSourceJdbcPartition(selectQueryGenerator, streamState), JdbcCursorPartition { override val completeState: OpaqueStateValue get() = - MysqlJdbcStreamStateValue.cursorIncrementalCheckpoint( + MySqlSourceJdbcStreamStateValue.cursorIncrementalCheckpoint( cursor, cursorCheckpoint = streamState.cursorUpperBound!!, streamState.stream, @@ -97,12 +97,12 @@ class MysqlJdbcNonResumableSnapshotWithCursorPartition( } /** Base class for default implementations of [JdbcPartition] for partitions. */ -sealed class MysqlJdbcResumablePartition( +sealed class MySqlSourceJdbcResumablePartition( selectQueryGenerator: SelectQueryGenerator, streamState: DefaultJdbcStreamState, val checkpointColumns: List, ) : - MysqlJdbcPartition(selectQueryGenerator, streamState), + MySqlSourceJdbcPartition(selectQueryGenerator, streamState), JdbcSplittablePartition { abstract val lowerBound: List? abstract val upperBound: List? @@ -179,49 +179,49 @@ sealed class MysqlJdbcResumablePartition( } /** RFR for cursor based read. */ -class MysqlJdbcRfrSnapshotPartition( +class MySqlSourceJdbcRfrSnapshotPartition( selectQueryGenerator: SelectQueryGenerator, override val streamState: DefaultJdbcStreamState, primaryKey: List, override val lowerBound: List?, override val upperBound: List?, -) : MysqlJdbcResumablePartition(selectQueryGenerator, streamState, primaryKey) { +) : MySqlSourceJdbcResumablePartition(selectQueryGenerator, streamState, primaryKey) { // TODO: this needs to reflect lastRecord. Complete state needs to have last primary key value // in RFR case. override val completeState: OpaqueStateValue get() = - MysqlJdbcStreamStateValue.snapshotCheckpoint( + MySqlSourceJdbcStreamStateValue.snapshotCheckpoint( primaryKey = checkpointColumns, primaryKeyCheckpoint = checkpointColumns.map { upperBound?.get(0) ?: Jsons.nullNode() }, ) override fun incompleteState(lastRecord: ObjectNode): OpaqueStateValue = - MysqlJdbcStreamStateValue.snapshotCheckpoint( + MySqlSourceJdbcStreamStateValue.snapshotCheckpoint( primaryKey = checkpointColumns, primaryKeyCheckpoint = checkpointColumns.map { lastRecord[it.id] ?: Jsons.nullNode() }, ) } /** RFR for CDC. */ -class MysqlJdbcCdcRfrSnapshotPartition( +class MySqlSourceJdbcCdcRfrSnapshotPartition( selectQueryGenerator: SelectQueryGenerator, override val streamState: DefaultJdbcStreamState, primaryKey: List, override val lowerBound: List?, override val upperBound: List?, -) : MysqlJdbcResumablePartition(selectQueryGenerator, streamState, primaryKey) { +) : MySqlSourceJdbcResumablePartition(selectQueryGenerator, streamState, primaryKey) { override val completeState: OpaqueStateValue get() = - MysqlCdcInitialSnapshotStateValue.snapshotCheckpoint( + MySqlSourceCdcInitialSnapshotStateValue.snapshotCheckpoint( primaryKey = checkpointColumns, primaryKeyCheckpoint = checkpointColumns.map { upperBound?.get(0) ?: Jsons.nullNode() }, ) override fun incompleteState(lastRecord: ObjectNode): OpaqueStateValue = - MysqlCdcInitialSnapshotStateValue.snapshotCheckpoint( + MySqlSourceCdcInitialSnapshotStateValue.snapshotCheckpoint( primaryKey = checkpointColumns, primaryKeyCheckpoint = checkpointColumns.map { lastRecord[it.id] ?: Jsons.nullNode() }, ) @@ -231,18 +231,18 @@ class MysqlJdbcCdcRfrSnapshotPartition( * Implementation of a [JdbcPartition] for a CDC snapshot partition. Used for incremental CDC * initial sync. */ -class MysqlJdbcCdcSnapshotPartition( +class MySqlSourceJdbcCdcSnapshotPartition( selectQueryGenerator: SelectQueryGenerator, override val streamState: DefaultJdbcStreamState, primaryKey: List, override val lowerBound: List? -) : MysqlJdbcResumablePartition(selectQueryGenerator, streamState, primaryKey) { +) : MySqlSourceJdbcResumablePartition(selectQueryGenerator, streamState, primaryKey) { override val upperBound: List? = null override val completeState: OpaqueStateValue - get() = MysqlCdcInitialSnapshotStateValue.getSnapshotCompletedState(stream) + get() = MySqlSourceCdcInitialSnapshotStateValue.getSnapshotCompletedState(stream) override fun incompleteState(lastRecord: ObjectNode): OpaqueStateValue = - MysqlCdcInitialSnapshotStateValue.snapshotCheckpoint( + MySqlSourceCdcInitialSnapshotStateValue.snapshotCheckpoint( primaryKey = checkpointColumns, primaryKeyCheckpoint = checkpointColumns.map { lastRecord[it.id] ?: Jsons.nullNode() }, ) @@ -251,14 +251,14 @@ class MysqlJdbcCdcSnapshotPartition( /** * Default implementation of a [JdbcPartition] for a splittable partition involving cursor columns. */ -sealed class MysqlJdbcCursorPartition( +sealed class MySqlSourceJdbcCursorPartition( selectQueryGenerator: SelectQueryGenerator, streamState: DefaultJdbcStreamState, checkpointColumns: List, val cursor: Field, private val explicitCursorUpperBound: JsonNode?, ) : - MysqlJdbcResumablePartition(selectQueryGenerator, streamState, checkpointColumns), + MySqlSourceJdbcResumablePartition(selectQueryGenerator, streamState, checkpointColumns), JdbcCursorPartition { val cursorUpperBound: JsonNode @@ -274,7 +274,7 @@ sealed class MysqlJdbcCursorPartition( * Default implementation of a [JdbcPartition] for a splittable snapshot partition preceding a * cursor-based incremental sync. */ -class MysqlJdbcSnapshotWithCursorPartition( +class MySqlSourceJdbcSnapshotWithCursorPartition( selectQueryGenerator: SelectQueryGenerator, override val streamState: DefaultJdbcStreamState, primaryKey: List, @@ -282,7 +282,7 @@ class MysqlJdbcSnapshotWithCursorPartition( cursor: Field, cursorUpperBound: JsonNode?, ) : - MysqlJdbcCursorPartition( + MySqlSourceJdbcCursorPartition( selectQueryGenerator, streamState, primaryKey, @@ -294,14 +294,14 @@ class MysqlJdbcSnapshotWithCursorPartition( override val completeState: OpaqueStateValue get() = - MysqlJdbcStreamStateValue.cursorIncrementalCheckpoint( + MySqlSourceJdbcStreamStateValue.cursorIncrementalCheckpoint( cursor, cursorUpperBound, stream, ) override fun incompleteState(lastRecord: ObjectNode): OpaqueStateValue = - MysqlJdbcStreamStateValue.snapshotWithCursorCheckpoint( + MySqlSourceJdbcStreamStateValue.snapshotWithCursorCheckpoint( primaryKey = checkpointColumns, primaryKeyCheckpoint = checkpointColumns.map { lastRecord[it.id] ?: Jsons.nullNode() }, cursor, @@ -313,7 +313,7 @@ class MysqlJdbcSnapshotWithCursorPartition( * Default implementation of a [JdbcPartition] for a cursor incremental partition. These are always * splittable. */ -class MysqlJdbcCursorIncrementalPartition( +class MySqlSourceJdbcCursorIncrementalPartition( selectQueryGenerator: SelectQueryGenerator, override val streamState: DefaultJdbcStreamState, cursor: Field, @@ -321,7 +321,7 @@ class MysqlJdbcCursorIncrementalPartition( override val isLowerBoundIncluded: Boolean, cursorUpperBound: JsonNode?, ) : - MysqlJdbcCursorPartition( + MySqlSourceJdbcCursorPartition( selectQueryGenerator, streamState, listOf(cursor), @@ -335,14 +335,14 @@ class MysqlJdbcCursorIncrementalPartition( override val completeState: OpaqueStateValue get() = - MysqlJdbcStreamStateValue.cursorIncrementalCheckpoint( + MySqlSourceJdbcStreamStateValue.cursorIncrementalCheckpoint( cursor, cursorCheckpoint = cursorUpperBound, stream, ) override fun incompleteState(lastRecord: ObjectNode): OpaqueStateValue = - MysqlJdbcStreamStateValue.cursorIncrementalCheckpoint( + MySqlSourceJdbcStreamStateValue.cursorIncrementalCheckpoint( cursor, cursorCheckpoint = lastRecord[cursor.id] ?: Jsons.nullNode(), stream, diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartitionFactory.kt similarity index 90% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartitionFactory.kt index 43235cae3996..b510e81ca406 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartitionFactory.kt @@ -36,15 +36,15 @@ import java.util.concurrent.ConcurrentHashMap @Primary @Singleton -class MysqlJdbcPartitionFactory( +class MySqlSourceJdbcPartitionFactory( override val sharedState: DefaultJdbcSharedState, - val selectQueryGenerator: MysqlSourceOperations, - val config: MysqlSourceConfiguration, + val selectQueryGenerator: MySqlSourceOperations, + val config: MySqlSourceConfiguration, ) : JdbcPartitionFactory< DefaultJdbcSharedState, DefaultJdbcStreamState, - MysqlJdbcPartition, + MySqlSourceJdbcPartition, > { private val streamStates = ConcurrentHashMap() @@ -75,13 +75,13 @@ class MysqlJdbcPartitionFactory( } } - private fun coldStart(streamState: DefaultJdbcStreamState): MysqlJdbcPartition { + private fun coldStart(streamState: DefaultJdbcStreamState): MySqlSourceJdbcPartition { val stream: Stream = streamState.stream val pkChosenFromCatalog: List = stream.configuredPrimaryKey ?: listOf() if (stream.configuredSyncMode == ConfiguredSyncMode.FULL_REFRESH) { if (pkChosenFromCatalog.isEmpty()) { - return MysqlJdbcNonResumableSnapshotPartition( + return MySqlSourceJdbcNonResumableSnapshotPartition( selectQueryGenerator, streamState, ) @@ -89,7 +89,7 @@ class MysqlJdbcPartitionFactory( val upperBound = findPkUpperBound(stream, pkChosenFromCatalog) if (sharedState.configuration.global) { - return MysqlJdbcCdcRfrSnapshotPartition( + return MySqlSourceJdbcCdcRfrSnapshotPartition( selectQueryGenerator, streamState, pkChosenFromCatalog, @@ -97,7 +97,7 @@ class MysqlJdbcPartitionFactory( upperBound = listOf(upperBound) ) } else { - return MysqlJdbcRfrSnapshotPartition( + return MySqlSourceJdbcRfrSnapshotPartition( selectQueryGenerator, streamState, pkChosenFromCatalog, @@ -108,7 +108,7 @@ class MysqlJdbcPartitionFactory( } if (sharedState.configuration.global) { - return MysqlJdbcCdcSnapshotPartition( + return MySqlSourceJdbcCdcSnapshotPartition( selectQueryGenerator, streamState, pkChosenFromCatalog, @@ -120,13 +120,13 @@ class MysqlJdbcPartitionFactory( stream.configuredCursor as? Field ?: throw ConfigErrorException("no cursor") if (pkChosenFromCatalog.isEmpty()) { - return MysqlJdbcNonResumableSnapshotWithCursorPartition( + return MySqlSourceJdbcNonResumableSnapshotWithCursorPartition( selectQueryGenerator, streamState, cursorChosenFromCatalog ) } - return MysqlJdbcSnapshotWithCursorPartition( + return MySqlSourceJdbcSnapshotWithCursorPartition( selectQueryGenerator, streamState, pkChosenFromCatalog, @@ -154,7 +154,7 @@ class MysqlJdbcPartitionFactory( * ii. In cursor read phase, use cursor incremental. * ``` */ - override fun create(streamFeedBootstrap: StreamFeedBootstrap): MysqlJdbcPartition? { + override fun create(streamFeedBootstrap: StreamFeedBootstrap): MySqlSourceJdbcPartition? { val stream: Stream = streamFeedBootstrap.feed val streamState: DefaultJdbcStreamState = streamState(streamFeedBootstrap) @@ -184,19 +184,22 @@ class MysqlJdbcPartitionFactory( ) { if ( streamState.streamFeedBootstrap.currentState == - MysqlJdbcStreamStateValue.snapshotCompleted + MySqlSourceJdbcStreamStateValue.snapshotCompleted ) { return null } - return MysqlJdbcNonResumableSnapshotPartition( + return MySqlSourceJdbcNonResumableSnapshotPartition( selectQueryGenerator, streamState, ) } if (!isCursorBased) { - val sv: MysqlCdcInitialSnapshotStateValue = - Jsons.treeToValue(opaqueStateValue, MysqlCdcInitialSnapshotStateValue::class.java) + val sv: MySqlSourceCdcInitialSnapshotStateValue = + Jsons.treeToValue( + opaqueStateValue, + MySqlSourceCdcInitialSnapshotStateValue::class.java + ) if (stream.configuredSyncMode == ConfiguredSyncMode.FULL_REFRESH) { val upperBound = findPkUpperBound(stream, pkChosenFromCatalog) @@ -205,7 +208,7 @@ class MysqlJdbcPartitionFactory( } val pkLowerBound: JsonNode = stateValueToJsonNode(pkChosenFromCatalog[0], sv.pkVal) - return MysqlJdbcRfrSnapshotPartition( + return MySqlSourceJdbcRfrSnapshotPartition( selectQueryGenerator, streamState, pkChosenFromCatalog, @@ -230,7 +233,7 @@ class MysqlJdbcPartitionFactory( if (sv.pkVal == upperBound.asText()) { return null } - return MysqlJdbcCdcRfrSnapshotPartition( + return MySqlSourceJdbcCdcRfrSnapshotPartition( selectQueryGenerator, streamState, pkChosenFromCatalog, @@ -238,7 +241,7 @@ class MysqlJdbcPartitionFactory( upperBound = listOf(upperBound) ) } - return MysqlJdbcCdcSnapshotPartition( + return MySqlSourceJdbcCdcSnapshotPartition( selectQueryGenerator, streamState, pkChosenFromCatalog, @@ -246,8 +249,8 @@ class MysqlJdbcPartitionFactory( ) } } else { - val sv: MysqlJdbcStreamStateValue = - Jsons.treeToValue(opaqueStateValue, MysqlJdbcStreamStateValue::class.java) + val sv: MySqlSourceJdbcStreamStateValue = + Jsons.treeToValue(opaqueStateValue, MySqlSourceJdbcStreamStateValue::class.java) if (stream.configuredSyncMode == ConfiguredSyncMode.FULL_REFRESH) { val upperBound = findPkUpperBound(stream, pkChosenFromCatalog) @@ -257,7 +260,7 @@ class MysqlJdbcPartitionFactory( val pkLowerBound: JsonNode = stateValueToJsonNode(pkChosenFromCatalog[0], sv.pkValue) - return MysqlJdbcCdcRfrSnapshotPartition( + return MySqlSourceJdbcCdcRfrSnapshotPartition( selectQueryGenerator, streamState, pkChosenFromCatalog, @@ -276,7 +279,7 @@ class MysqlJdbcPartitionFactory( stream.configuredCursor as? Field ?: throw ConfigErrorException("no cursor") // in a state where it's still in primary_key read part. - return MysqlJdbcSnapshotWithCursorPartition( + return MySqlSourceJdbcSnapshotWithCursorPartition( selectQueryGenerator, streamState, pkChosenFromCatalog, @@ -299,7 +302,7 @@ class MysqlJdbcPartitionFactory( // Incremental complete. return null } - return MysqlJdbcCursorIncrementalPartition( + return MySqlSourceJdbcCursorIncrementalPartition( selectQueryGenerator, streamState, cursor, @@ -374,9 +377,9 @@ class MysqlJdbcPartitionFactory( } override fun split( - unsplitPartition: MysqlJdbcPartition, + unsplitPartition: MySqlSourceJdbcPartition, opaqueStateValues: List - ): List { + ): List { // At this moment we don't support split on within mysql stream in any mode. return listOf(unsplitPartition) } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcStreamStateValue.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcStreamStateValue.kt similarity index 84% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcStreamStateValue.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcStreamStateValue.kt index 68499f2fc206..53f755a06a5e 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcStreamStateValue.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcStreamStateValue.kt @@ -11,10 +11,10 @@ import io.airbyte.cdk.discover.Field import io.airbyte.cdk.read.Stream import io.airbyte.cdk.util.Jsons -data class MysqlJdbcStreamStateValue( +data class MySqlSourceJdbcStreamStateValue( @JsonProperty("cursor") val cursors: String? = null, @JsonProperty("version") val version: Int = 2, - @JsonProperty("state_type") val stateType: String = StateType.CURSOR_BASED.stateType, + @JsonProperty("state_type") val stateType: String = StateType.CURSOR_BASED.serialized, @JsonProperty("stream_name") val streamName: String = "", @JsonProperty("cursor_field") val cursorField: List = listOf(), @JsonProperty("stream_namespace") val streamNamespace: String = "", @@ -26,7 +26,7 @@ data class MysqlJdbcStreamStateValue( companion object { /** Value representing the completion of a FULL_REFRESH snapshot. */ val snapshotCompleted: OpaqueStateValue - get() = Jsons.valueToTree(MysqlJdbcStreamStateValue(stateType = "primary_key")) + get() = Jsons.valueToTree(MySqlSourceJdbcStreamStateValue(stateType = "primary_key")) /** Value representing the progress of an ongoing incremental cursor read. */ fun cursorIncrementalCheckpoint( @@ -38,7 +38,7 @@ data class MysqlJdbcStreamStateValue( true -> Jsons.nullNode() false -> Jsons.valueToTree( - MysqlJdbcStreamStateValue( + MySqlSourceJdbcStreamStateValue( cursorField = listOf(cursor.id), cursors = cursorCheckpoint.asText(), streamName = stream.name, @@ -58,10 +58,10 @@ data class MysqlJdbcStreamStateValue( true -> Jsons.nullNode() false -> Jsons.valueToTree( - MysqlJdbcStreamStateValue( + MySqlSourceJdbcStreamStateValue( pkName = primaryKeyField.id, pkValue = primaryKeyCheckpoint.first().asText(), - stateType = StateType.PRIMARY_KEY.stateType, + stateType = StateType.PRIMARY_KEY.serialized, ) ) } @@ -79,13 +79,13 @@ data class MysqlJdbcStreamStateValue( true -> Jsons.nullNode() false -> Jsons.valueToTree( - MysqlJdbcStreamStateValue( + MySqlSourceJdbcStreamStateValue( pkName = primaryKeyField.id, pkValue = primaryKeyCheckpoint.first().asText(), - stateType = StateType.PRIMARY_KEY.stateType, + stateType = StateType.PRIMARY_KEY.serialized, incrementalState = Jsons.valueToTree( - MysqlJdbcStreamStateValue( + MySqlSourceJdbcStreamStateValue( cursorField = listOf(cursor.id), streamName = stream.name, streamNamespace = stream.namespace!! @@ -96,9 +96,12 @@ data class MysqlJdbcStreamStateValue( } } } -} -enum class StateType(val stateType: String) { - PRIMARY_KEY("primary_key"), - CURSOR_BASED("cursor_based"), + enum class StateType { + PRIMARY_KEY, + CURSOR_BASED, + ; + + val serialized: String = name.lowercase() + } } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceMetadataQuerier.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceMetadataQuerier.kt similarity index 95% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceMetadataQuerier.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceMetadataQuerier.kt index 1130fb8c26d5..54e0a2d6f146 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceMetadataQuerier.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceMetadataQuerier.kt @@ -24,7 +24,7 @@ import java.sql.Statement private val log = KotlinLogging.logger {} /** Delegates to [JdbcMetadataQuerier] except for [fields]. */ -class MysqlSourceMetadataQuerier( +class MySqlSourceMetadataQuerier( val base: JdbcMetadataQuerier, ) : MetadataQuerier by base { @@ -116,7 +116,7 @@ class MysqlSourceMetadataQuerier( val results = mutableListOf() val schemas: List = streamNamespaces() val sql: String = PK_QUERY_FMTSTR.format(schemas.joinToString { "\'$it\'" }) - log.info { "Querying Mysql system tables for all primary keys for catalog discovery." } + log.info { "Querying MySQL system tables for all primary keys for catalog discovery." } try { // Get primary keys for the specified table base.conn.createStatement().use { stmt: Statement -> @@ -134,7 +134,7 @@ class MysqlSourceMetadataQuerier( } } } - log.info { "Discovered all primary keys in ${schemas.size} Mysql schema(s)." } + log.info { "Discovered all primary keys in ${schemas.size} MySQL schema(s)." } return@lazy results .groupBy { findTableName( @@ -163,7 +163,7 @@ class MysqlSourceMetadataQuerier( } .toMap() } catch (e: Exception) { - throw RuntimeException("Mysql primary key discovery query failed: ${e.message}", e) + throw RuntimeException("MySQL primary key discovery query failed: ${e.message}", e) } } @@ -200,7 +200,7 @@ class MysqlSourceMetadataQuerier( """ } - /** Mysql implementation of [MetadataQuerier.Factory]. */ + /** MySQL implementation of [MetadataQuerier.Factory]. */ @Singleton @Primary class Factory( @@ -208,9 +208,9 @@ class MysqlSourceMetadataQuerier( val selectQueryGenerator: SelectQueryGenerator, val fieldTypeMapper: JdbcMetadataQuerier.FieldTypeMapper, val checkQueries: JdbcCheckQueries, - ) : MetadataQuerier.Factory { + ) : MetadataQuerier.Factory { /** The [SourceConfiguration] is deliberately not injected in order to support tests. */ - override fun session(config: MysqlSourceConfiguration): MetadataQuerier { + override fun session(config: MySqlSourceConfiguration): MetadataQuerier { val jdbcConnectionFactory = JdbcConnectionFactory(config) val base = JdbcMetadataQuerier( @@ -221,7 +221,7 @@ class MysqlSourceMetadataQuerier( checkQueries, jdbcConnectionFactory, ) - return MysqlSourceMetadataQuerier(base) + return MySqlSourceMetadataQuerier(base) } } } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceOperations.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceOperations.kt similarity index 93% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceOperations.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceOperations.kt index 1d01a1999750..83f5bee64a95 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceOperations.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceOperations.kt @@ -66,26 +66,24 @@ import io.airbyte.cdk.read.WhereClauseNode import io.airbyte.cdk.read.WhereNode import io.airbyte.cdk.read.cdc.DebeziumState import io.airbyte.cdk.util.Jsons -import io.airbyte.integrations.source.mysql.cdc.MySqlDebeziumOperations -import io.airbyte.integrations.source.mysql.cdc.MySqlPosition import io.micronaut.context.annotation.Primary import jakarta.inject.Singleton import java.time.OffsetDateTime @Singleton @Primary -class MysqlSourceOperations : +class MySqlSourceOperations : JdbcMetadataQuerier.FieldTypeMapper, SelectQueryGenerator, JdbcAirbyteStreamFactory { - override val globalCursor: MetaField = MysqlCdcMetaFields.CDC_CURSOR + override val globalCursor: MetaField = MySqlSourceCdcMetaFields.CDC_CURSOR override val globalMetaFields: Set = setOf( - MysqlCdcMetaFields.CDC_CURSOR, + MySqlSourceCdcMetaFields.CDC_CURSOR, CommonMetaField.CDC_UPDATED_AT, CommonMetaField.CDC_DELETED_AT, - MysqlCdcMetaFields.CDC_LOG_FILE, - MysqlCdcMetaFields.CDC_LOG_POS + MySqlSourceCdcMetaFields.CDC_LOG_FILE, + MySqlSourceCdcMetaFields.CDC_LOG_POS ) override fun decorateRecordData( @@ -99,21 +97,22 @@ class MysqlSourceOperations : CdcOffsetDateTimeMetaFieldType.jsonEncoder.encode(timestamp), ) recordData.set( - MysqlCdcMetaFields.CDC_LOG_POS.id, + MySqlSourceCdcMetaFields.CDC_LOG_POS.id, CdcIntegerMetaFieldType.jsonEncoder.encode(0), ) if (globalStateValue == null) { return } val debeziumState: DebeziumState = - MySqlDebeziumOperations.deserializeDebeziumState(globalStateValue) - val position: MySqlPosition = MySqlDebeziumOperations.position(debeziumState.offset) + MySqlSourceDebeziumOperations.deserializeDebeziumState(globalStateValue) + val position: MySqlSourceCdcPosition = + MySqlSourceDebeziumOperations.position(debeziumState.offset) recordData.set( - MysqlCdcMetaFields.CDC_LOG_FILE.id, + MySqlSourceCdcMetaFields.CDC_LOG_FILE.id, CdcStringMetaFieldType.jsonEncoder.encode(position.fileName), ) recordData.set( - MysqlCdcMetaFields.CDC_LOG_POS.id, + MySqlSourceCdcMetaFields.CDC_LOG_POS.id, CdcIntegerMetaFieldType.jsonEncoder.encode(position.position), ) } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSelectQuerier.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceSelectQuerier.kt similarity index 90% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSelectQuerier.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceSelectQuerier.kt index d311c1abb8d7..7617437e6b22 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSelectQuerier.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceSelectQuerier.kt @@ -15,7 +15,7 @@ import jakarta.inject.Singleton /** Mysql implementation of [JdbcSelectQuerier], which sets fetch size differently */ @Singleton @Primary -class MysqlSelectQuerier( +class MySqlSourceSelectQuerier( private val jdbcConnectionFactory: JdbcConnectionFactory, ) : SelectQuerier by JdbcSelectQuerier(jdbcConnectionFactory) { private val log = KotlinLogging.logger {} @@ -23,15 +23,15 @@ class MysqlSelectQuerier( override fun executeQuery( q: SelectQuery, parameters: SelectQuerier.Parameters, - ): SelectQuerier.Result = MysqlResult(jdbcConnectionFactory, q, parameters) + ): SelectQuerier.Result = MySqlResult(jdbcConnectionFactory, q, parameters) - inner class MysqlResult( + inner class MySqlResult( jdbcConnectionFactory: JdbcConnectionFactory, q: SelectQuery, parameters: SelectQuerier.Parameters, ) : JdbcSelectQuerier.Result(jdbcConnectionFactory, q, parameters) { /** - * Mysql does things differently with fetch size. Setting fetch size on a result set is + * MySQL does things differently with fetch size. Setting fetch size on a result set is * safer than on a statement. */ override fun initQueryExecution() { diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/converters/MySqlBooleanConverter.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/converters/MySqlBooleanConverter.kt deleted file mode 100644 index b505226f53aa..000000000000 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/converters/MySqlBooleanConverter.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.mysql.cdc.converters - -import io.airbyte.cdk.read.cdc.Converted -import io.airbyte.cdk.read.cdc.NoConversion -import io.airbyte.cdk.read.cdc.PartialConverter -import io.airbyte.cdk.read.cdc.RelationalColumnCustomConverter -import org.apache.kafka.connect.data.SchemaBuilder - -class MySqlBooleanConverter : RelationalColumnCustomConverter { - - override val debeziumPropertiesKey: String = "boolean" - override val handlers: List = - listOf(booleanHandler, tinyint1Handler) - - companion object { - val booleanHandler = - RelationalColumnCustomConverter.Handler( - predicate = { it.typeName().startsWith("BOOL", ignoreCase = true) }, - outputSchema = SchemaBuilder.bool(), - partialConverters = - listOf( - PartialConverter { - when (it) { - null -> Converted(false) - is Boolean -> Converted(it) - else -> NoConversion - } - } - ) - ) - - val tinyint1Handler = - RelationalColumnCustomConverter.Handler( - predicate = { - it.typeName().equals("TINYINT", ignoreCase = true) && - it.length().isPresent && - it.length().asInt == 1 - }, - outputSchema = SchemaBuilder.bool(), - partialConverters = - listOf( - PartialConverter { - when (it) { - null -> Converted(false) - is Number -> Converted(it != 0) - else -> NoConversion - } - } - ) - ) - } -} diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlContainerFactory.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlContainerFactory.kt similarity index 83% rename from airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlContainerFactory.kt rename to airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlContainerFactory.kt index 14f3e4ab555a..d67a632935c6 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlContainerFactory.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlContainerFactory.kt @@ -8,7 +8,7 @@ import org.testcontainers.containers.MySQLContainer import org.testcontainers.containers.Network import org.testcontainers.utility.DockerImageName -object MysqlContainerFactory { +object MySqlContainerFactory { const val COMPATIBLE_NAME = "mysql:8.0" private val log = KotlinLogging.logger {} @@ -16,16 +16,16 @@ object MysqlContainerFactory { TestContainerFactory.register(COMPATIBLE_NAME, ::MySQLContainer) } - sealed interface MysqlContainerModifier : + sealed interface MySqlContainerModifier : TestContainerFactory.ContainerModifier> - data object WithNetwork : MysqlContainerModifier { + data object WithNetwork : MySqlContainerModifier { override fun modify(container: MySQLContainer<*>) { container.withNetwork(Network.newNetwork()) } } - data object WithCdc : MysqlContainerModifier { + data object WithCdc : MySqlContainerModifier { override fun modify(container: MySQLContainer<*>) { container.start() container.execAsRoot(GTID_ON) @@ -44,7 +44,7 @@ object MysqlContainerFactory { "ON *.* TO '%s'@'%%';" } - data object WithCdcOff : MysqlContainerModifier { + data object WithCdcOff : MySqlContainerModifier { override fun modify(container: MySQLContainer<*>) { container.withCommand("--skip-log-bin") } @@ -52,7 +52,7 @@ object MysqlContainerFactory { fun exclusive( imageName: String, - vararg modifiers: MysqlContainerModifier, + vararg modifiers: MySqlContainerModifier, ): MySQLContainer<*> { val dockerImageName = DockerImageName.parse(imageName).asCompatibleSubstituteFor(COMPATIBLE_NAME) @@ -61,7 +61,7 @@ object MysqlContainerFactory { fun shared( imageName: String, - vararg modifiers: MysqlContainerModifier, + vararg modifiers: MySqlContainerModifier, ): MySQLContainer<*> { val dockerImageName = DockerImageName.parse(imageName).asCompatibleSubstituteFor(COMPATIBLE_NAME) @@ -69,8 +69,8 @@ object MysqlContainerFactory { } @JvmStatic - fun config(mySQLContainer: MySQLContainer<*>): MysqlSourceConfigurationSpecification = - MysqlSourceConfigurationSpecification().apply { + fun config(mySQLContainer: MySQLContainer<*>): MySqlSourceConfigurationSpecification = + MySqlSourceConfigurationSpecification().apply { host = mySQLContainer.host port = mySQLContainer.getMappedPort(MySQLContainer.MYSQL_PORT) username = mySQLContainer.username @@ -82,10 +82,6 @@ object MysqlContainerFactory { setMethodValue(UserDefinedCursor) } - @JvmStatic - fun cdcConfig(mySQLContainer: MySQLContainer<*>): MysqlSourceConfigurationSpecification = - config(mySQLContainer).also { it.setMethodValue(CdcCursor()) } - fun MySQLContainer<*>.execAsRoot(sql: String) { val cleanSql: String = sql.trim().removeSuffix(";") + ";" log.info { "Executing SQL as root: $cleanSql" } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcIntegrationTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcIntegrationTest.kt similarity index 88% rename from airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcIntegrationTest.kt rename to airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcIntegrationTest.kt index e74856b3be50..f172df4d01d6 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcIntegrationTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcIntegrationTest.kt @@ -12,7 +12,7 @@ import io.airbyte.cdk.jdbc.IntFieldType import io.airbyte.cdk.jdbc.JdbcConnectionFactory import io.airbyte.cdk.jdbc.StringFieldType import io.airbyte.cdk.output.BufferingOutputConsumer -import io.airbyte.integrations.source.mysql.MysqlContainerFactory.execAsRoot +import io.airbyte.integrations.source.mysql.MySqlContainerFactory.execAsRoot import io.airbyte.protocol.models.v0.AirbyteConnectionStatus import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteStateMessage @@ -31,7 +31,7 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout import org.testcontainers.containers.MySQLContainer -class MysqlCdcIntegrationTest { +class MySqlSourceCdcIntegrationTest { @Test fun testCheck() { @@ -43,19 +43,19 @@ class MysqlCdcIntegrationTest { AirbyteConnectionStatus.Status.SUCCEEDED ) - MysqlContainerFactory.exclusive( + MySqlContainerFactory.exclusive( imageName = "mysql:8.0", - MysqlContainerFactory.WithCdcOff, + MySqlContainerFactory.WithCdcOff, ) .use { nonCdcDbContainer -> { - val invalidConfig: MysqlSourceConfigurationSpecification = - MysqlContainerFactory.config(nonCdcDbContainer).apply { + val invalidConfig: MySqlSourceConfigurationSpecification = + MySqlContainerFactory.config(nonCdcDbContainer).apply { setMethodValue(CdcCursor()) } val nonCdcConnectionFactory = - JdbcConnectionFactory(MysqlSourceConfigurationFactory().make(invalidConfig)) + JdbcConnectionFactory(MySqlSourceConfigurationFactory().make(invalidConfig)) provisionTestContainer(nonCdcDbContainer, nonCdcConnectionFactory) @@ -108,11 +108,11 @@ class MysqlCdcIntegrationTest { val log = KotlinLogging.logger {} lateinit var dbContainer: MySQLContainer<*> - fun config(): MysqlSourceConfigurationSpecification = - MysqlContainerFactory.config(dbContainer).apply { setMethodValue(CdcCursor()) } + fun config(): MySqlSourceConfigurationSpecification = + MySqlContainerFactory.config(dbContainer).apply { setMethodValue(CdcCursor()) } val connectionFactory: JdbcConnectionFactory by lazy { - JdbcConnectionFactory(MysqlSourceConfigurationFactory().make(config())) + JdbcConnectionFactory(MySqlSourceConfigurationFactory().make(config())) } val configuredCatalog: ConfiguredAirbyteCatalog = run { @@ -123,12 +123,12 @@ class MysqlCdcIntegrationTest { columns = listOf(Field("k", IntFieldType), Field("v", StringFieldType)), primaryKeyColumnIDs = listOf(listOf("k")), ) - val stream: AirbyteStream = MysqlSourceOperations().createGlobal(discoveredStream) + val stream: AirbyteStream = MySqlSourceOperations().createGlobal(discoveredStream) val configuredStream: ConfiguredAirbyteStream = CatalogHelpers.toDefaultConfiguredStream(stream) .withSyncMode(SyncMode.INCREMENTAL) .withPrimaryKey(discoveredStream.primaryKeyColumnIDs) - .withCursorField(listOf(MysqlCdcMetaFields.CDC_CURSOR.id)) + .withCursorField(listOf(MySqlSourceCdcMetaFields.CDC_CURSOR.id)) ConfiguredAirbyteCatalog().withStreams(listOf(configuredStream)) } @@ -137,9 +137,9 @@ class MysqlCdcIntegrationTest { @Timeout(value = 300) fun startAndProvisionTestContainer() { dbContainer = - MysqlContainerFactory.exclusive( + MySqlContainerFactory.exclusive( imageName = "mysql:8.0", - MysqlContainerFactory.WithNetwork, + MySqlContainerFactory.WithNetwork, ) provisionTestContainer(dbContainer, connectionFactory) } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfigurationSpecificationTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfigurationSpecificationTest.kt similarity index 90% rename from airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfigurationSpecificationTest.kt rename to airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfigurationSpecificationTest.kt index ee38781a8307..51e41a392652 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfigurationSpecificationTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfigurationSpecificationTest.kt @@ -13,9 +13,9 @@ import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test @MicronautTest(environments = [Environment.TEST], rebuildContext = true) -class MysqlSourceConfigurationSpecificationTestTest { +class MySqlSourceConfigurationSpecificationTest { @Inject - lateinit var supplier: ConfigurationSpecificationSupplier + lateinit var supplier: ConfigurationSpecificationSupplier @Test fun testSchemaViolation() { @@ -25,7 +25,7 @@ class MysqlSourceConfigurationSpecificationTestTest { @Test @Property(name = "airbyte.connector.config.json", value = CONFIG_JSON) fun testJson() { - val pojo: MysqlSourceConfigurationSpecification = supplier.get() + val pojo: MySqlSourceConfigurationSpecification = supplier.get() Assertions.assertEquals("localhost", pojo.host) Assertions.assertEquals(12345, pojo.port) Assertions.assertEquals("FOO", pojo.username) @@ -41,10 +41,11 @@ class MysqlSourceConfigurationSpecificationTestTest { Assertions.assertEquals(60, pojo.checkpointTargetIntervalSeconds) Assertions.assertEquals(2, pojo.concurrency) } -} -const val CONFIG_JSON: String = - """ + companion object { + + const val CONFIG_JSON: String = + """ { "host": "localhost", "port": 12345, @@ -69,3 +70,5 @@ const val CONFIG_JSON: String = "concurrency": 2 } """ + } +} diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfigurationTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfigurationTest.kt similarity index 81% rename from airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfigurationTest.kt rename to airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfigurationTest.kt index 98fd548677fa..66294e7acb1f 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfigurationTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfigurationTest.kt @@ -15,14 +15,14 @@ import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test @MicronautTest(environments = [Environment.TEST, AIRBYTE_CLOUD_ENV], rebuildContext = true) -class MysqlSourceConfigurationTest { +class MySqlSourceConfigurationTest { @Inject lateinit var pojoSupplier: - ConfigurationSpecificationSupplier + ConfigurationSpecificationSupplier @Inject lateinit var factory: - SourceConfigurationFactory + SourceConfigurationFactory @Test @Property(name = "airbyte.connector.config.host", value = "localhost") @@ -36,7 +36,7 @@ class MysqlSourceConfigurationTest { value = "theAnswerToLiveAndEverything=42&sessionVariables=max_execution_time=10000&foo=bar&" ) fun testParseJdbcParameters() { - val pojo: MysqlSourceConfigurationSpecification = pojoSupplier.get() + val pojo: MySqlSourceConfigurationSpecification = pojoSupplier.get() val config = factory.makeWithoutExceptionHandling(pojo) @@ -63,7 +63,7 @@ class MysqlSourceConfigurationTest { @Property(name = "airbyte.connector.config.password", value = "BAR") @Property(name = "airbyte.connector.config.database", value = "SYSTEM") fun testAirbyteCloudDeployment() { - val pojo: MysqlSourceConfigurationSpecification = pojoSupplier.get() + val pojo: MySqlSourceConfigurationSpecification = pojoSupplier.get() Assertions.assertThrows(ConfigErrorException::class.java) { factory.makeWithoutExceptionHandling(pojo) } @@ -72,7 +72,7 @@ class MysqlSourceConfigurationTest { @Test @Property(name = "airbyte.connector.config.json", value = CONFIG_V1) fun testParseConfigFromV1() { - val pojo: MysqlSourceConfigurationSpecification = pojoSupplier.get() + val pojo: MySqlSourceConfigurationSpecification = pojoSupplier.get() val config = factory.makeWithoutExceptionHandling(pojo) @@ -96,37 +96,11 @@ class MysqlSourceConfigurationTest { Assertions.assertTrue(config.sshTunnel is SshNoTunnelMethod) } -} -const val CONFIG: String = - """ -{ - "host": "localhost", - "port": 12345, - "username": "FOO", - "password": "BAR", - "database": "SYSTEM", - "ssl_mode": { - "mode": "preferred" - }, - "tunnel_method": { - "tunnel_method": "SSH_PASSWORD_AUTH", - "tunnel_host": "localhost", - "tunnel_port": 2222, - "tunnel_user": "sshuser", - "tunnel_user_password": "***" - }, - "replication_method": { - "method": "STANDARD" - }, - "checkpoint_target_interval_seconds": 60, - "jdbc_url_params": "theAnswerToLiveAndEverything=42&sessionVariables=max_execution_time=10000&foo=bar&", - "concurrency": 2 -} -""" + companion object { -const val CONFIG_V1: String = - """ + const val CONFIG_V1: String = + """ { "host": "localhost", "port": 12345, @@ -147,3 +121,5 @@ const val CONFIG_V1: String = } } """ + } +} diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlCursorBasedIntegrationTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCursorBasedIntegrationTest.kt similarity index 96% rename from airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlCursorBasedIntegrationTest.kt rename to airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCursorBasedIntegrationTest.kt index 2447c49adf5f..800e5a9d2786 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlCursorBasedIntegrationTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCursorBasedIntegrationTest.kt @@ -32,7 +32,7 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout import org.testcontainers.containers.MySQLContainer -class MysqlCursorBasedIntegrationTest { +class MySqlSourceCursorBasedIntegrationTest { @BeforeEach fun resetTable() { @@ -161,13 +161,13 @@ class MysqlCursorBasedIntegrationTest { companion object { val log = KotlinLogging.logger {} - val dbContainer: MySQLContainer<*> = MysqlContainerFactory.shared(imageName = "mysql:8.0") + val dbContainer: MySQLContainer<*> = MySqlContainerFactory.shared(imageName = "mysql:8.0") - val config: MysqlSourceConfigurationSpecification = - MysqlContainerFactory.config(dbContainer) + val config: MySqlSourceConfigurationSpecification = + MySqlContainerFactory.config(dbContainer) val connectionFactory: JdbcConnectionFactory by lazy { - JdbcConnectionFactory(MysqlSourceConfigurationFactory().make(config)) + JdbcConnectionFactory(MySqlSourceConfigurationFactory().make(config)) } fun getConfiguredCatalog(): ConfiguredAirbyteCatalog { @@ -178,7 +178,7 @@ class MysqlCursorBasedIntegrationTest { columns = listOf(Field("k", IntFieldType), Field("v", StringFieldType)), primaryKeyColumnIDs = listOf(listOf("k")), ) - val stream: AirbyteStream = MysqlSourceOperations().createGlobal(discoveredStream) + val stream: AirbyteStream = MySqlSourceOperations().createGlobal(discoveredStream) val configuredStream: ConfiguredAirbyteStream = CatalogHelpers.toDefaultConfiguredStream(stream) .withSyncMode(SyncMode.INCREMENTAL) diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlDatatypeIntegrationTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDatatypeIntegrationTest.kt similarity index 78% rename from airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlDatatypeIntegrationTest.kt rename to airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDatatypeIntegrationTest.kt index 625efd982bad..a0f9d18877c7 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlDatatypeIntegrationTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDatatypeIntegrationTest.kt @@ -19,12 +19,12 @@ import org.junit.jupiter.api.TestFactory import org.junit.jupiter.api.Timeout import org.testcontainers.containers.MySQLContainer -class MySqlDatatypeIntegrationTest { +class MySqlSourceDatatypeIntegrationTest { @TestFactory @Timeout(300) fun syncTests(): Iterable = - DynamicDatatypeTestFactory(MySqlDatatypeTestOperations).build(dbContainer) + DynamicDatatypeTestFactory(MySqlSourceDatatypeTestOperations).build(dbContainer) companion object { @@ -34,38 +34,38 @@ class MySqlDatatypeIntegrationTest { @BeforeAll @Timeout(value = 300) fun startAndProvisionTestContainer() { - dbContainer = MysqlContainerFactory.shared("mysql:8.0", MysqlContainerFactory.WithCdc) + dbContainer = MySqlContainerFactory.shared("mysql:8.0", MySqlContainerFactory.WithCdc) } } } -object MySqlDatatypeTestOperations : +object MySqlSourceDatatypeTestOperations : DatatypeTestOperations< MySQLContainer<*>, - MysqlSourceConfigurationSpecification, - MysqlSourceConfiguration, - MysqlSourceConfigurationFactory, - MySqlDatatypeTestCase + MySqlSourceConfigurationSpecification, + MySqlSourceConfiguration, + MySqlSourceConfigurationFactory, + MySqlSourceDatatypeTestCase > { private val log = KotlinLogging.logger {} override val withGlobal: Boolean = true - override val globalCursorMetaField: MetaField = MysqlCdcMetaFields.CDC_CURSOR + override val globalCursorMetaField: MetaField = MySqlSourceCdcMetaFields.CDC_CURSOR override fun streamConfigSpec( container: MySQLContainer<*> - ): MysqlSourceConfigurationSpecification = - MysqlContainerFactory.config(container).also { it.setMethodValue(UserDefinedCursor) } + ): MySqlSourceConfigurationSpecification = + MySqlContainerFactory.config(container).also { it.setMethodValue(UserDefinedCursor) } override fun globalConfigSpec( container: MySQLContainer<*> - ): MysqlSourceConfigurationSpecification = - MysqlContainerFactory.config(container).also { it.setMethodValue(CdcCursor()) } + ): MySqlSourceConfigurationSpecification = + MySqlContainerFactory.config(container).also { it.setMethodValue(CdcCursor()) } - override val configFactory: MysqlSourceConfigurationFactory = MysqlSourceConfigurationFactory() + override val configFactory: MySqlSourceConfigurationFactory = MySqlSourceConfigurationFactory() - override fun createStreams(config: MysqlSourceConfiguration) { + override fun createStreams(config: MySqlSourceConfiguration) { JdbcConnectionFactory(config).get().use { connection: Connection -> connection.isReadOnly = false connection.createStatement().use { it.execute("CREATE DATABASE IF NOT EXISTS test") } @@ -79,7 +79,7 @@ object MySqlDatatypeTestOperations : } } - override fun populateStreams(config: MysqlSourceConfiguration) { + override fun populateStreams(config: MySqlSourceConfiguration) { JdbcConnectionFactory(config).get().use { connection: Connection -> connection.isReadOnly = false connection.createStatement().use { it.execute("USE test") } @@ -210,50 +210,50 @@ object MySqlDatatypeTestOperations : "X'89504E470D0A1A0A0000000D49484452'" to """"iVBORw0KGgoAAAANSUhEUg=="""", ) - override val testCases: Map = + override val testCases: Map = listOf( - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "BOOLEAN", booleanValues, LeafAirbyteSchemaType.BOOLEAN, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "VARCHAR(10)", stringValues, LeafAirbyteSchemaType.STRING, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "DECIMAL(60,4)", bigDecimalValues, LeafAirbyteSchemaType.NUMBER, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "DECIMAL(60,0)", bigIntegerValues, LeafAirbyteSchemaType.INTEGER, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "DECIMAL(10,2)", decimalValues, LeafAirbyteSchemaType.NUMBER, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "DECIMAL(10,2) UNSIGNED", decimalValues, LeafAirbyteSchemaType.NUMBER, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "DECIMAL UNSIGNED", zeroPrecisionDecimalValues, LeafAirbyteSchemaType.INTEGER, ), - MySqlDatatypeTestCase("FLOAT", floatValues, LeafAirbyteSchemaType.NUMBER), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase("FLOAT", floatValues, LeafAirbyteSchemaType.NUMBER), + MySqlSourceDatatypeTestCase( "FLOAT(34)", floatValues, LeafAirbyteSchemaType.NUMBER, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "FLOAT(7,4)", floatValues, LeafAirbyteSchemaType.NUMBER, @@ -262,97 +262,97 @@ object MySqlDatatypeTestOperations : // not strictly equal due to IEEE754 encoding artifacts, but acceptable. isGlobal = false, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "FLOAT(53,8)", doubleValues, LeafAirbyteSchemaType.NUMBER, ), - MySqlDatatypeTestCase("DOUBLE", decimalValues, LeafAirbyteSchemaType.NUMBER), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase("DOUBLE", decimalValues, LeafAirbyteSchemaType.NUMBER), + MySqlSourceDatatypeTestCase( "DOUBLE UNSIGNED", decimalValues, LeafAirbyteSchemaType.NUMBER, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "TINYINT", tinyintValues, LeafAirbyteSchemaType.INTEGER, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "TINYINT UNSIGNED", tinyintValues, LeafAirbyteSchemaType.INTEGER, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "SMALLINT", tinyintValues, LeafAirbyteSchemaType.INTEGER, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "MEDIUMINT", tinyintValues, LeafAirbyteSchemaType.INTEGER, ), - MySqlDatatypeTestCase("BIGINT", intValues, LeafAirbyteSchemaType.INTEGER), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase("BIGINT", intValues, LeafAirbyteSchemaType.INTEGER), + MySqlSourceDatatypeTestCase( "SMALLINT UNSIGNED", tinyintValues, LeafAirbyteSchemaType.INTEGER, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "MEDIUMINT UNSIGNED", tinyintValues, LeafAirbyteSchemaType.INTEGER, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "BIGINT UNSIGNED", intValues, LeafAirbyteSchemaType.INTEGER, ), - MySqlDatatypeTestCase("INT", intValues, LeafAirbyteSchemaType.INTEGER), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase("INT", intValues, LeafAirbyteSchemaType.INTEGER), + MySqlSourceDatatypeTestCase( "INT UNSIGNED", intValues, LeafAirbyteSchemaType.INTEGER, ), - MySqlDatatypeTestCase("DATE", dateValues, LeafAirbyteSchemaType.DATE), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase("DATE", dateValues, LeafAirbyteSchemaType.DATE), + MySqlSourceDatatypeTestCase( "TIMESTAMP", timestampValues, LeafAirbyteSchemaType.TIMESTAMP_WITH_TIMEZONE, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "DATETIME", dateTimeValues, LeafAirbyteSchemaType.TIMESTAMP_WITHOUT_TIMEZONE, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "TIME", timeValues, LeafAirbyteSchemaType.TIME_WITHOUT_TIMEZONE, ), - MySqlDatatypeTestCase("YEAR", yearValues, LeafAirbyteSchemaType.INTEGER), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase("YEAR", yearValues, LeafAirbyteSchemaType.INTEGER), + MySqlSourceDatatypeTestCase( "VARBINARY(255)", binaryValues, LeafAirbyteSchemaType.BINARY, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "BIT", bitValues, LeafAirbyteSchemaType.BOOLEAN, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "BIT(8)", multiBitValues, LeafAirbyteSchemaType.BINARY, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "JSON", jsonValues, LeafAirbyteSchemaType.JSONB, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "ENUM('a', 'b', 'c')", enumValues, LeafAirbyteSchemaType.STRING, @@ -361,7 +361,7 @@ object MySqlDatatypeTestOperations : .associateBy { it.id } } -data class MySqlDatatypeTestCase( +data class MySqlSourceDatatypeTestCase( val sqlType: String, val sqlToAirbyte: Map, override val expectedAirbyteSchemaType: AirbyteSchemaType, diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactoryTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartitionFactoryTest.kt similarity index 83% rename from airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactoryTest.kt rename to airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartitionFactoryTest.kt index 4a3e7b16cb4c..a111d15f372f 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactoryTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartitionFactoryTest.kt @@ -32,23 +32,22 @@ import io.airbyte.protocol.models.v0.StreamDescriptor import io.mockk.mockk import java.time.OffsetDateTime import java.util.Base64 -import kotlin.test.assertEquals import kotlin.test.assertNull import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test -class MysqlJdbcPartitionFactoryTest { +class MySqlSourceJdbcPartitionFactoryTest { companion object { - private val selectQueryGenerator = MysqlSourceOperations() + private val selectQueryGenerator = MySqlSourceOperations() private val sharedState = sharedState() private val cdcSharedState = sharedState(global = true) - private val config = mockk() + private val config = mockk() - val mysqlJdbcPartitionFactory = - MysqlJdbcPartitionFactory(sharedState, selectQueryGenerator, config) + val mySqlSourceJdbcPartitionFactory = + MySqlSourceJdbcPartitionFactory(sharedState, selectQueryGenerator, config) val mysqlCdcJdbcPartitionFactory = - MysqlJdbcPartitionFactory(cdcSharedState, selectQueryGenerator, config) + MySqlSourceJdbcPartitionFactory(cdcSharedState, selectQueryGenerator, config) val fieldId = Field("id", IntFieldType) val stream = @@ -109,7 +108,7 @@ class MysqlJdbcPartitionFactoryTest { ): DefaultJdbcSharedState { val configSpec = - MysqlSourceConfigurationSpecification().apply { + MySqlSourceConfigurationSpecification().apply { host = "" port = 0 username = "foo" @@ -121,7 +120,7 @@ class MysqlJdbcPartitionFactoryTest { } else { configSpec.setMethodValue(UserDefinedCursor) } - val configFactory = MysqlSourceConfigurationFactory() + val configFactory = MySqlSourceConfigurationFactory() val configuration = configFactory.make(configSpec) val mockSelectQuerier = mockk() @@ -168,14 +167,14 @@ class MysqlJdbcPartitionFactoryTest { @Test fun testColdStartWithPkCursorBased() { - val jdbcPartition = mysqlJdbcPartitionFactory.create(streamFeedBootstrap(stream)) - assertTrue(jdbcPartition is MysqlJdbcSnapshotWithCursorPartition) + val jdbcPartition = mySqlSourceJdbcPartitionFactory.create(streamFeedBootstrap(stream)) + assertTrue(jdbcPartition is MySqlSourceJdbcSnapshotWithCursorPartition) } @Test fun testColdStartWithPkCdc() { val jdbcPartition = mysqlCdcJdbcPartitionFactory.create(streamFeedBootstrap(stream)) - assertTrue(jdbcPartition is MysqlJdbcCdcSnapshotPartition) + assertTrue(jdbcPartition is MySqlSourceJdbcCdcSnapshotPartition) } @Test @@ -191,8 +190,9 @@ class MysqlJdbcPartitionFactoryTest { configuredPrimaryKey = listOf(), configuredCursor = fieldId, ) - val jdbcPartition = mysqlJdbcPartitionFactory.create(streamFeedBootstrap(streamWithoutPk)) - assertTrue(jdbcPartition is MysqlJdbcNonResumableSnapshotWithCursorPartition) + val jdbcPartition = + mySqlSourceJdbcPartitionFactory.create(streamFeedBootstrap(streamWithoutPk)) + assertTrue(jdbcPartition is MySqlSourceJdbcNonResumableSnapshotWithCursorPartition) } @Test @@ -215,8 +215,8 @@ class MysqlJdbcPartitionFactoryTest { ) val jdbcPartition = - mysqlJdbcPartitionFactory.create(streamFeedBootstrap(stream, incomingStateValue)) - assertTrue(jdbcPartition is MysqlJdbcCursorIncrementalPartition) + mySqlSourceJdbcPartitionFactory.create(streamFeedBootstrap(stream, incomingStateValue)) + assertTrue(jdbcPartition is MySqlSourceJdbcCursorIncrementalPartition) } @Test @@ -239,14 +239,14 @@ class MysqlJdbcPartitionFactoryTest { ) val jdbcPartition = - mysqlJdbcPartitionFactory.create( + mySqlSourceJdbcPartitionFactory.create( streamFeedBootstrap(timestampStream, incomingStateValue) ) - assertTrue(jdbcPartition is MysqlJdbcCursorIncrementalPartition) + assertTrue(jdbcPartition is MySqlSourceJdbcCursorIncrementalPartition) assertEquals( Jsons.valueToTree("2025-09-02T05:23:35.000000Z"), - (jdbcPartition as MysqlJdbcCursorIncrementalPartition).cursorLowerBound + (jdbcPartition as MySqlSourceJdbcCursorIncrementalPartition).cursorLowerBound ) } @@ -270,14 +270,14 @@ class MysqlJdbcPartitionFactoryTest { ) val jdbcPartition = - mysqlJdbcPartitionFactory.create( + mySqlSourceJdbcPartitionFactory.create( streamFeedBootstrap(datetimeStream, incomingStateValue) ) - assertTrue(jdbcPartition is MysqlJdbcCursorIncrementalPartition) + assertTrue(jdbcPartition is MySqlSourceJdbcCursorIncrementalPartition) assertEquals( Jsons.valueToTree("2024-11-21T11:59:57.123000"), - (jdbcPartition as MysqlJdbcCursorIncrementalPartition).cursorLowerBound + (jdbcPartition as MySqlSourceJdbcCursorIncrementalPartition).cursorLowerBound ) } @@ -297,9 +297,9 @@ class MysqlJdbcPartitionFactoryTest { ) val jdbcPartition = - mysqlJdbcPartitionFactory.create(streamFeedBootstrap(stream, incomingStateValue)) + mySqlSourceJdbcPartitionFactory.create(streamFeedBootstrap(stream, incomingStateValue)) - assertTrue(jdbcPartition is MysqlJdbcSnapshotWithCursorPartition) + assertTrue(jdbcPartition is MySqlSourceJdbcSnapshotWithCursorPartition) } @Test @@ -319,7 +319,7 @@ class MysqlJdbcPartitionFactoryTest { val jdbcPartition = mysqlCdcJdbcPartitionFactory.create(streamFeedBootstrap(stream, incomingStateValue)) - assertTrue(jdbcPartition is MysqlJdbcCdcSnapshotPartition) + assertTrue(jdbcPartition is MySqlSourceJdbcCdcSnapshotPartition) } @Test @@ -360,12 +360,14 @@ class MysqlJdbcPartitionFactoryTest { ) val jdbcPartition = - mysqlJdbcPartitionFactory.create(streamFeedBootstrap(binaryStream, incomingStateValue)) - assertTrue(jdbcPartition is MysqlJdbcCursorIncrementalPartition) + mySqlSourceJdbcPartitionFactory.create( + streamFeedBootstrap(binaryStream, incomingStateValue) + ) + assertTrue(jdbcPartition is MySqlSourceJdbcCursorIncrementalPartition) assertEquals( Jsons.valueToTree(Base64.getDecoder().decode("OQAAAAAAAAAAAAAAAAAAAA==")), - (jdbcPartition as MysqlJdbcCursorIncrementalPartition).cursorLowerBound + (jdbcPartition as MySqlSourceJdbcCursorIncrementalPartition).cursorLowerBound ) } } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceSelectQueryGeneratorTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceSelectQueryGeneratorTest.kt similarity index 97% rename from airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceSelectQueryGeneratorTest.kt rename to airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceSelectQueryGeneratorTest.kt index 14fb19b67917..226ba557234e 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceSelectQueryGeneratorTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceSelectQueryGeneratorTest.kt @@ -27,7 +27,7 @@ import io.airbyte.cdk.util.Jsons import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test -class MysqlSourceSelectQueryGeneratorTest { +class MySqlSourceSelectQueryGeneratorTest { @Test fun testSelectLimit0() { SelectQuerySpec( @@ -139,7 +139,7 @@ class MysqlSourceSelectQueryGeneratorTest { select.columns, bindings.map { SelectQuery.Binding(it.first, it.second) }, ) - val actual: SelectQuery = MysqlSourceOperations().generate(this.optimize()) + val actual: SelectQuery = MySqlSourceOperations().generate(this.optimize()) Assertions.assertEquals(expected, actual) } } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSpecIntegrationTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceSpecIntegrationTest.kt similarity index 87% rename from airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSpecIntegrationTest.kt rename to airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceSpecIntegrationTest.kt index 63a29db649b6..bd77db88b100 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSpecIntegrationTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceSpecIntegrationTest.kt @@ -4,7 +4,7 @@ package io.airbyte.integrations.source.mysql import io.airbyte.cdk.command.SyncsTestFixture import org.junit.jupiter.api.Test -class MysqlSpecIntegrationTest { +class MySqlSourceSpecIntegrationTest { @Test fun testSpec() { SyncsTestFixture.testSpec("expected-spec.json") diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceTestConfigurationFactory.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceTestConfigurationFactory.kt similarity index 73% rename from airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceTestConfigurationFactory.kt rename to airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceTestConfigurationFactory.kt index 60930cb82823..ef6621958f59 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceTestConfigurationFactory.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceTestConfigurationFactory.kt @@ -12,12 +12,12 @@ import java.time.Duration @Singleton @Requires(env = [Environment.TEST]) @Primary -class MysqlSourceTestConfigurationFactory(val featureFlags: Set) : - SourceConfigurationFactory { +class MySqlSourceTestConfigurationFactory(val featureFlags: Set) : + SourceConfigurationFactory { override fun makeWithoutExceptionHandling( - pojo: MysqlSourceConfigurationSpecification, - ): MysqlSourceConfiguration = - MysqlSourceConfigurationFactory(featureFlags) + pojo: MySqlSourceConfigurationSpecification, + ): MySqlSourceConfiguration = + MySqlSourceConfigurationFactory(featureFlags) .makeWithoutExceptionHandling(pojo) .copy( maxConcurrency = 1, diff --git a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected-spec.json b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected-spec.json index 329b2434bd72..f7518edc66a7 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected-spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected-spec.json @@ -2,7 +2,7 @@ "documentationUrl": "https://docs.airbyte.com/integrations/sources/mysql", "connectionSpecification": { "type": "object", - "title": "Mysql Source Spec", + "title": "MySQL Source Spec", "$schema": "http://json-schema.org/draft-07/schema#", "required": ["host", "port", "database", "username", "replication_method"], "properties": { @@ -365,7 +365,7 @@ "description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard." } }, - "description": "Recommended - Incrementally reads new inserts, updates, and deletes using Mysql's change data capture feature. This must be enabled on your database.", + "description": "Recommended - Incrementally reads new inserts, updates, and deletes using MySQL's change data capture feature. This must be enabled on your database.", "additionalProperties": true } ],