diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index 8bf6b20e09da..3a5e4143d295 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -3,21 +3,19 @@ plugins { } application { - mainClass = 'io.airbyte.integrations.source.mysql.MysqlSource' + mainClass = 'io.airbyte.integrations.source.mysql.MySqlSource' } airbyteBulkConnector { core = 'extract' toolkits = ['extract-jdbc', 'extract-cdc'] - cdk = 'local' + cdk = '0.226' } dependencies { implementation 'com.mysql:mysql-connector-j:9.1.0' - implementation 'org.codehaus.plexus:plexus-utils:4.0.0' implementation 'io.debezium:debezium-connector-mysql' - testImplementation platform('org.testcontainers:testcontainers-bom:1.20.2') testImplementation 'org.testcontainers:mysql' testImplementation("io.mockk:mockk:1.12.0") } diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 1664f0159176..f2bff2066105 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -1,7 +1,7 @@ data: ab_internal: - ql: 200 - sl: 0 + ql: 400 + sl: 300 allowedHosts: hosts: - ${host} @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.9.3 + dockerImageTag: 3.9.4 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSource.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSource.kt similarity index 92% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSource.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSource.kt index c46a2c053471..29caa56c7aaf 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSource.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSource.kt @@ -3,7 +3,7 @@ package io.airbyte.integrations.source.mysql import io.airbyte.cdk.AirbyteSourceRunner -object MysqlSource { +object MySqlSource { @JvmStatic fun main(args: Array) { AirbyteSourceRunner.run(*args) diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/converters/MySQLBooleanConverter.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcBooleanConverter.kt similarity index 91% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/converters/MySQLBooleanConverter.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcBooleanConverter.kt index dabeb49ee611..467645fa6b96 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/converters/MySQLBooleanConverter.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcBooleanConverter.kt @@ -2,14 +2,14 @@ * Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql.cdc.converters +package io.airbyte.integrations.source.mysql import io.debezium.spi.converter.CustomConverter import io.debezium.spi.converter.RelationalColumn import java.util.* import org.apache.kafka.connect.data.SchemaBuilder -class MySQLBooleanConverter : CustomConverter { +class MySqlSourceCdcBooleanConverter : CustomConverter { override fun configure(props: Properties?) {} private val BOOLEAN_TYPES = arrayOf("BOOLEAN", "BOOL", "TINYINT") diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcInitialSnapshotStateValue.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcInitialSnapshotStateValue.kt similarity index 91% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcInitialSnapshotStateValue.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcInitialSnapshotStateValue.kt index 55f3c10a8db4..621966c080a5 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcInitialSnapshotStateValue.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcInitialSnapshotStateValue.kt @@ -11,7 +11,7 @@ import io.airbyte.cdk.discover.Field import io.airbyte.cdk.read.Stream import io.airbyte.cdk.util.Jsons -data class MysqlCdcInitialSnapshotStateValue( +data class MySqlSourceCdcInitialSnapshotStateValue( @JsonProperty("pk_val") val pkVal: String? = null, @JsonProperty("pk_name") val pkName: String? = null, @JsonProperty("version") val version: Int? = null, @@ -25,7 +25,7 @@ data class MysqlCdcInitialSnapshotStateValue( /** Value representing the completion of a FULL_REFRESH snapshot. */ fun getSnapshotCompletedState(stream: Stream): OpaqueStateValue = Jsons.valueToTree( - MysqlCdcInitialSnapshotStateValue( + MySqlSourceCdcInitialSnapshotStateValue( streamName = stream.name, cursorField = listOf(), streamNamespace = stream.namespace @@ -42,7 +42,7 @@ data class MysqlCdcInitialSnapshotStateValue( true -> Jsons.nullNode() false -> Jsons.valueToTree( - MysqlCdcInitialSnapshotStateValue( + MySqlSourceCdcInitialSnapshotStateValue( pkName = primaryKeyField.id, pkVal = primaryKeyCheckpoint.first().asText(), stateType = "primary_key", diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcMetaFields.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcMetaFields.kt similarity index 93% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcMetaFields.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcMetaFields.kt index d8f4ad6de14a..c079bb0c0202 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcMetaFields.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcMetaFields.kt @@ -9,7 +9,7 @@ import io.airbyte.cdk.discover.CdcStringMetaFieldType import io.airbyte.cdk.discover.FieldType import io.airbyte.cdk.discover.MetaField -enum class MysqlCdcMetaFields( +enum class MySqlSourceCdcMetaFields( override val type: FieldType, ) : MetaField { CDC_CURSOR(CdcIntegerMetaFieldType), diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/converters/MySQLNumbericConverter.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcNumericConverter.kt similarity index 91% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/converters/MySQLNumbericConverter.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcNumericConverter.kt index 0764f6a06ade..359aa28dc6e8 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/converters/MySQLNumbericConverter.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcNumericConverter.kt @@ -2,14 +2,14 @@ * Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql.cdc.converters +package io.airbyte.integrations.source.mysql import io.debezium.spi.converter.CustomConverter import io.debezium.spi.converter.RelationalColumn import java.util.* import org.apache.kafka.connect.data.SchemaBuilder -class MySQLNumericConverter : CustomConverter { +class MySqlSourceCdcNumericConverter : CustomConverter { override fun configure(props: Properties?) {} private val NUMERIC_TYPES = arrayOf("FLOAT", "DOUBLE", "DECIMAL") diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlPosition.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcPosition.kt similarity index 80% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlPosition.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcPosition.kt index bcb7abefbcfd..13598aba4b89 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlPosition.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcPosition.kt @@ -2,13 +2,14 @@ * Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql.cdc +package io.airbyte.integrations.source.mysql import kotlin.io.path.Path import kotlin.io.path.extension /** WAL position datum for MySQL. */ -data class MySqlPosition(val fileName: String, val position: Long) : Comparable { +data class MySqlSourceCdcPosition(val fileName: String, val position: Long) : + Comparable { /** * Numerical value encoded in the extension of the binlog file name. @@ -31,5 +32,6 @@ data class MySqlPosition(val fileName: String, val position: Long) : Comparable< val cursorValue: Long get() = (fileExtension.toLong() shl Int.SIZE_BITS) or position - override fun compareTo(other: MySqlPosition): Int = cursorValue.compareTo(other.cursorValue) + override fun compareTo(other: MySqlSourceCdcPosition): Int = + cursorValue.compareTo(other.cursorValue) } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/converters/MySQLDateTimeConverter.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcTemporalConverter.kt similarity index 96% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/converters/MySQLDateTimeConverter.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcTemporalConverter.kt index 7125ec9d35a1..83758c7fef14 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/converters/MySQLDateTimeConverter.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcTemporalConverter.kt @@ -2,7 +2,7 @@ * Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql.cdc.converters +package io.airbyte.integrations.source.mysql import io.airbyte.cdk.jdbc.converters.DateTimeConverter import io.debezium.spi.converter.CustomConverter @@ -24,7 +24,7 @@ import org.apache.kafka.connect.data.SchemaBuilder * MySqlCdcProperties#commonProperties(JdbcDatabase)} (If you don't rename, a test would still fail * but it might be tricky to figure out where to change the property name) */ -class MySQLDateTimeConverter : CustomConverter { +class MySqlSourceCdcTemporalConverter : CustomConverter { private val DATE_TYPES = arrayOf("DATE", "DATETIME", "TIME", "TIMESTAMP") override fun configure(props: Properties?) {} diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfiguration.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfiguration.kt similarity index 87% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfiguration.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfiguration.kt index b5c50a5b237d..7e342d020925 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfiguration.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfiguration.kt @@ -21,8 +21,8 @@ import java.time.Duration private val log = KotlinLogging.logger {} -/** Mysql-specific implementation of [SourceConfiguration] */ -data class MysqlSourceConfiguration( +/** MySQL-specific implementation of [SourceConfiguration] */ +data class MySqlSourceConfiguration( override val realHost: String, override val realPort: Int, override val sshTunnel: SshTunnelMethodConfiguration?, @@ -41,16 +41,16 @@ data class MysqlSourceConfiguration( ) : JdbcSourceConfiguration, CdcSourceConfiguration { override val global = incrementalConfiguration is CdcIncrementalConfiguration - /** Required to inject [MysqlSourceConfiguration] directly. */ + /** Required to inject [MySqlSourceConfiguration] directly. */ @Factory private class MicronautFactory { @Singleton fun mysqlSourceConfig( factory: SourceConfigurationFactory< - MysqlSourceConfigurationSpecification, MysqlSourceConfiguration>, - supplier: ConfigurationSpecificationSupplier, - ): MysqlSourceConfiguration = factory.make(supplier.get()) + MySqlSourceConfigurationSpecification, MySqlSourceConfiguration>, + supplier: ConfigurationSpecificationSupplier, + ): MySqlSourceConfiguration = factory.make(supplier.get()) } } @@ -71,14 +71,14 @@ enum class InvalidCdcCursorPositionBehavior { } @Singleton -class MysqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set) : - SourceConfigurationFactory { +class MySqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set) : + SourceConfigurationFactory { constructor() : this(emptySet()) override fun makeWithoutExceptionHandling( - pojo: MysqlSourceConfigurationSpecification, - ): MysqlSourceConfiguration { + pojo: MySqlSourceConfigurationSpecification, + ): MySqlSourceConfiguration { val realHost: String = pojo.host val realPort: Int = pojo.port val sshTunnel: SshTunnelMethodConfiguration? = pojo.getTunnelMethodValue() @@ -115,20 +115,21 @@ class MysqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set< "SSL encryption or an SSH tunnel." ) } - MysqlJdbcEncryption(sslMode = SSLMode.PREFERRED) + MySqlSourceEncryption(sslMode = MySqlSourceEncryption.SslMode.PREFERRED) } - is EncryptionRequired -> MysqlJdbcEncryption(sslMode = SSLMode.REQUIRED) + is EncryptionRequired -> + MySqlSourceEncryption(sslMode = MySqlSourceEncryption.SslMode.REQUIRED) is SslVerifyCertificate -> - MysqlJdbcEncryption( - sslMode = SSLMode.VERIFY_CA, + MySqlSourceEncryption( + sslMode = MySqlSourceEncryption.SslMode.VERIFY_CA, caCertificate = encryption.sslCertificate, clientCertificate = encryption.sslClientCertificate, clientKey = encryption.sslClientKey, clientKeyPassword = encryption.sslClientPassword ) is SslVerifyIdentity -> - MysqlJdbcEncryption( - sslMode = SSLMode.VERIFY_IDENTITY, + MySqlSourceEncryption( + sslMode = MySqlSourceEncryption.SslMode.VERIFY_IDENTITY, caCertificate = encryption.sslCertificate, clientCertificate = encryption.sslClientCertificate, clientKey = encryption.sslClientKey, @@ -178,7 +179,7 @@ class MysqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set< }, ) } - return MysqlSourceConfiguration( + return MySqlSourceConfiguration( realHost = realHost, realPort = realPort, sshTunnel = sshTunnel, diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfigurationSpecification.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfigurationSpecification.kt similarity index 98% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfigurationSpecification.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfigurationSpecification.kt index f2741de20053..1c20cd6de425 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfigurationSpecification.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfigurationSpecification.kt @@ -27,20 +27,20 @@ import io.micronaut.context.annotation.ConfigurationProperties import jakarta.inject.Singleton /** - * The object which is mapped to the Mysql source configuration JSON. + * The object which is mapped to the MySQL source configuration JSON. * - * Use [MysqlSourceConfiguration] instead wherever possible. This object also allows injecting + * Use [MySqlSourceConfiguration] instead wherever possible. This object also allows injecting * values through Micronaut properties, this is made possible by the classes named * `MicronautPropertiesFriendly.*`. */ -@JsonSchemaTitle("Mysql Source Spec") +@JsonSchemaTitle("MySQL Source Spec") @JsonPropertyOrder( value = ["host", "port", "database", "username", "replication_method"], ) @Singleton @ConfigurationProperties(CONNECTOR_CONFIG_PREFIX) @SuppressFBWarnings(value = ["NP_NONNULL_RETURN_VIOLATION"], justification = "Micronaut DI") -class MysqlSourceConfigurationSpecification : ConfigurationSpecification() { +class MySqlSourceConfigurationSpecification : ConfigurationSpecification() { @JsonProperty("host") @JsonSchemaTitle("Host") @JsonSchemaInject(json = """{"order":1}""") @@ -320,7 +320,7 @@ data object UserDefinedCursor : CursorMethodConfiguration @JsonSchemaTitle("Read Changes using Change Data Capture (CDC)") @JsonSchemaDescription( "Recommended - " + - "Incrementally reads new inserts, updates, and deletes using Mysql's change data capture feature. This must be enabled on your database.", ) diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumOperations.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDebeziumOperations.kt similarity index 89% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumOperations.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDebeziumOperations.kt index 97d43c0698a8..c10119684631 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumOperations.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDebeziumOperations.kt @@ -2,7 +2,7 @@ * Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mysql.cdc +package io.airbyte.integrations.source.mysql import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.node.ArrayNode @@ -30,13 +30,6 @@ import io.airbyte.cdk.read.cdc.DebeziumState import io.airbyte.cdk.read.cdc.DeserializedRecord import io.airbyte.cdk.ssh.TunnelSession import io.airbyte.cdk.util.Jsons -import io.airbyte.integrations.source.mysql.CdcIncrementalConfiguration -import io.airbyte.integrations.source.mysql.InvalidCdcCursorPositionBehavior -import io.airbyte.integrations.source.mysql.MysqlCdcMetaFields -import io.airbyte.integrations.source.mysql.MysqlSourceConfiguration -import io.airbyte.integrations.source.mysql.cdc.converters.MySQLBooleanConverter -import io.airbyte.integrations.source.mysql.cdc.converters.MySQLDateTimeConverter -import io.airbyte.integrations.source.mysql.cdc.converters.MySQLNumericConverter import io.debezium.connector.mysql.MySqlConnector import io.debezium.connector.mysql.gtid.MySqlGtidSet import io.debezium.document.DocumentReader @@ -61,11 +54,11 @@ import org.apache.kafka.connect.source.SourceRecord import org.apache.mina.util.Base64 @Singleton -class MySqlDebeziumOperations( +class MySqlSourceDebeziumOperations( val jdbcConnectionFactory: JdbcConnectionFactory, - val configuration: MysqlSourceConfiguration, + val configuration: MySqlSourceConfiguration, random: Random = Random.Default, -) : DebeziumOperations { +) : DebeziumOperations { private val log = KotlinLogging.logger {} override fun deserialize( @@ -94,11 +87,20 @@ class MySqlDebeziumOperations( if (isDelete) transactionTimestampJsonNode else Jsons.nullNode(), ) // Set _ab_cdc_log_file and _ab_cdc_log_pos meta-field values. - val position = MySqlPosition(source["file"].asText(), source["pos"].asLong()) - data.set(MysqlCdcMetaFields.CDC_LOG_FILE.id, TextCodec.encode(position.fileName)) - data.set(MysqlCdcMetaFields.CDC_LOG_POS.id, LongCodec.encode(position.position)) + val position = MySqlSourceCdcPosition(source["file"].asText(), source["pos"].asLong()) + data.set( + MySqlSourceCdcMetaFields.CDC_LOG_FILE.id, + TextCodec.encode(position.fileName) + ) + data.set( + MySqlSourceCdcMetaFields.CDC_LOG_POS.id, + LongCodec.encode(position.position) + ) // Set the _ab_cdc_cursor meta-field value. - data.set(MysqlCdcMetaFields.CDC_CURSOR.id, LongCodec.encode(position.cursorValue)) + data.set( + MySqlSourceCdcMetaFields.CDC_CURSOR.id, + LongCodec.encode(position.cursorValue) + ) // Return a DeserializedRecord instance. return DeserializedRecord(data, changes = emptyMap()) } @@ -117,7 +119,7 @@ class MySqlDebeziumOperations( */ private fun validate(debeziumState: DebeziumState): CdcStateValidateResult { val savedStateOffset: SavedOffset = parseSavedOffset(debeziumState) - val (_: MySqlPosition, gtidSet: String?) = queryPositionAndGtids() + val (_: MySqlSourceCdcPosition, gtidSet: String?) = queryPositionAndGtids() if (gtidSet.isNullOrEmpty() && !savedStateOffset.gtidSet.isNullOrEmpty()) { log.info { "Connector used GTIDs previously, but MySQL server does not know of any GTIDs or they are not enabled" @@ -183,12 +185,12 @@ class MySqlDebeziumOperations( } private fun parseSavedOffset(debeziumState: DebeziumState): SavedOffset { - val position: MySqlPosition = position(debeziumState.offset) + val position: MySqlSourceCdcPosition = position(debeziumState.offset) val gtidSet: String? = debeziumState.offset.wrapped.values.first()["gtids"]?.asText() return SavedOffset(position, gtidSet) } - data class SavedOffset(val position: MySqlPosition, val gtidSet: String?) + data class SavedOffset(val position: MySqlSourceCdcPosition, val gtidSet: String?) enum class CdcStateValidateResult { VALID, @@ -196,23 +198,25 @@ class MySqlDebeziumOperations( INVALID_RESET } - override fun position(offset: DebeziumOffset): MySqlPosition = Companion.position(offset) + override fun position(offset: DebeziumOffset): MySqlSourceCdcPosition = + Companion.position(offset) - override fun position(recordValue: DebeziumRecordValue): MySqlPosition? { + override fun position(recordValue: DebeziumRecordValue): MySqlSourceCdcPosition? { val file: JsonNode = recordValue.source["file"]?.takeIf { it.isTextual } ?: return null val pos: JsonNode = recordValue.source["pos"]?.takeIf { it.isIntegralNumber } ?: return null - return MySqlPosition(file.asText(), pos.asLong()) + return MySqlSourceCdcPosition(file.asText(), pos.asLong()) } - override fun position(sourceRecord: SourceRecord): MySqlPosition? { + override fun position(sourceRecord: SourceRecord): MySqlSourceCdcPosition? { val offset: Map = sourceRecord.sourceOffset() val file: Any = offset["file"] ?: return null val pos: Long = offset["pos"] as? Long ?: return null - return MySqlPosition(file.toString(), pos) + return MySqlSourceCdcPosition(file.toString(), pos) } override fun synthesize(): DebeziumInput { - val (mySqlPosition: MySqlPosition, gtidSet: String?) = queryPositionAndGtids() + val (mySqlSourceCdcPosition: MySqlSourceCdcPosition, gtidSet: String?) = + queryPositionAndGtids() val topicPrefixName: String = DebeziumPropertiesBuilder.sanitizeTopicPrefix(databaseName) val timestamp: Instant = Instant.now() val key: ArrayNode = @@ -223,8 +227,8 @@ class MySqlDebeziumOperations( val value: ObjectNode = Jsons.objectNode().apply { put("ts_sec", timestamp.epochSecond) - put("file", mySqlPosition.fileName) - put("pos", mySqlPosition.position) + put("file", mySqlSourceCdcPosition.fileName) + put("pos", mySqlSourceCdcPosition.position) if (gtidSet != null) { put("gtids", gtidSet) } @@ -235,7 +239,7 @@ class MySqlDebeziumOperations( return DebeziumInput(syntheticProperties, state, isSynthetic = true) } - private fun queryPositionAndGtids(): Pair { + private fun queryPositionAndGtids(): Pair { val file = Field("File", StringFieldType) val pos = Field("Position", LongFieldType) val gtids = Field("Executed_Gtid_Set", StringFieldType) @@ -244,8 +248,8 @@ class MySqlDebeziumOperations( val sql = "SHOW MASTER STATUS" stmt.executeQuery(sql).use { rs: ResultSet -> if (!rs.next()) throw ConfigErrorException("No results for query: $sql") - val mySqlPosition = - MySqlPosition( + val mySqlSourceCdcPosition = + MySqlSourceCdcPosition( fileName = rs.getString(file.id)?.takeUnless { rs.wasNull() } ?: throw ConfigErrorException( "No value for ${file.id} in: $sql", @@ -257,7 +261,7 @@ class MySqlDebeziumOperations( ) if (rs.metaData.columnCount <= 4) { // This value exists only in MySQL 5.6.5 or later. - return mySqlPosition to null + return mySqlSourceCdcPosition to null } val gtidSet: String? = rs.getString(gtids.id) @@ -265,7 +269,7 @@ class MySqlDebeziumOperations( ?.trim() ?.replace("\n", "") ?.replace("\r", "") - return mySqlPosition to gtidSet + return mySqlSourceCdcPosition to gtidSet } } } @@ -285,7 +289,7 @@ class MySqlDebeziumOperations( } private fun getBinaryLogFileNames(): List { - // Very old Mysql version (4.x) has different output of SHOW BINARY LOGS output. + // Very old MySQL version (4.x) has different output of SHOW BINARY LOGS output. return jdbcConnectionFactory.get().use { connection: Connection -> connection.createStatement().use { stmt: Statement -> val sql = "SHOW BINARY LOGS" @@ -398,11 +402,10 @@ class MySqlDebeziumOperations( .with("converters", "datetime,numeric,boolean") .with( "datetime.type", - MySQLDateTimeConverter::class.java.getName(), + MySqlSourceCdcTemporalConverter::class.java.getName(), ) - .with("numeric.type", MySQLNumericConverter::class.java.getName()) - .with("boolean.type", MySQLBooleanConverter::class.java.getName()) - + .with("numeric.type", MySqlSourceCdcNumericConverter::class.java.getName()) + .with("boolean.type", MySqlSourceCdcBooleanConverter::class.java.getName()) val serverTimezone: String? = (configuration.incrementalConfiguration as CdcIncrementalConfiguration).serverTimezone if (!serverTimezone.isNullOrBlank()) { @@ -483,12 +486,12 @@ class MySqlDebeziumOperations( return DebeziumState(offset, DebeziumSchemaHistory(schemaHistoryList)) } - internal fun position(offset: DebeziumOffset): MySqlPosition { + internal fun position(offset: DebeziumOffset): MySqlSourceCdcPosition { if (offset.wrapped.size != 1) { throw ConfigErrorException("Expected exactly 1 key in $offset") } val offsetValue: ObjectNode = offset.wrapped.values.first() as ObjectNode - return MySqlPosition(offsetValue["file"].asText(), offsetValue["pos"].asLong()) + return MySqlSourceCdcPosition(offsetValue["file"].asText(), offsetValue["pos"].asLong()) } } } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcEncryption.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceEncryption.kt similarity index 89% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcEncryption.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceEncryption.kt index f7bf5a456144..c77ece2130da 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcEncryption.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceEncryption.kt @@ -5,7 +5,6 @@ package io.airbyte.integrations.source.mysql import io.airbyte.cdk.ConfigErrorException -import io.airbyte.cdk.SystemErrorException import io.airbyte.cdk.jdbc.SSLCertificateUtils import io.github.oshai.kotlinlogging.KotlinLogging import java.net.MalformedURLException @@ -15,22 +14,68 @@ import java.util.UUID private val log = KotlinLogging.logger {} -class MysqlJdbcEncryption( - val sslMode: SSLMode = SSLMode.PREFERRED, +class MySqlSourceEncryption( + val sslMode: SslMode = SslMode.PREFERRED, val caCertificate: String? = null, val clientCertificate: String? = null, val clientKey: String? = null, val clientKeyPassword: String? = null, ) { - companion object { - const val TRUST_KEY_STORE_URL: String = "trustCertificateKeyStoreUrl" - const val TRUST_KEY_STORE_PASS: String = "trustCertificateKeyStorePassword" - const val CLIENT_KEY_STORE_URL: String = "clientCertificateKeyStoreUrl" - const val CLIENT_KEY_STORE_PASS: String = "clientCertificateKeyStorePassword" - const val CLIENT_KEY_STORE_TYPE: String = "clientCertificateKeyStoreType" - const val TRUST_KEY_STORE_TYPE: String = "trustCertificateKeyStoreType" - const val KEY_STORE_TYPE_PKCS12: String = "PKCS12" - const val SSL_MODE: String = "sslMode" + + /** + * Enum representing the SSL mode for MySQL connections. The actual jdbc property name is the + * lower case of the enum name. + */ + enum class SslMode { + PREFERRED, + REQUIRED, + VERIFY_CA, + VERIFY_IDENTITY, + } + + fun parseSSLConfig(): Map { + var caCertKeyStorePair: Pair? + var clientCertKeyStorePair: Pair? + val additionalParameters: MutableMap = mutableMapOf() + + additionalParameters[SSL_MODE] = sslMode.name.lowercase() + + caCertKeyStorePair = prepareCACertificateKeyStore() + + if (null != caCertKeyStorePair) { + log.debug { "uri for ca cert keystore: ${caCertKeyStorePair.first}" } + try { + additionalParameters.putAll( + mapOf( + TRUST_KEY_STORE_URL to caCertKeyStorePair.first.toURL().toString(), + TRUST_KEY_STORE_PASS to caCertKeyStorePair.second, + TRUST_KEY_STORE_TYPE to KEY_STORE_TYPE_PKCS12 + ) + ) + } catch (e: MalformedURLException) { + throw ConfigErrorException("Unable to get a URL for trust key store") + } + + clientCertKeyStorePair = prepareClientCertificateKeyStore() + + if (null != clientCertKeyStorePair) { + log.debug { + "uri for client cert keystore: ${clientCertKeyStorePair.first} / ${clientCertKeyStorePair.second}" + } + try { + additionalParameters.putAll( + mapOf( + CLIENT_KEY_STORE_URL to clientCertKeyStorePair.first.toURL().toString(), + CLIENT_KEY_STORE_PASS to clientCertKeyStorePair.second, + CLIENT_KEY_STORE_TYPE to KEY_STORE_TYPE_PKCS12 + ) + ) + } catch (e: MalformedURLException) { + throw ConfigErrorException("Unable to get a URL for client key store") + } + } + } + return additionalParameters } private fun getOrGeneratePassword(): String { @@ -85,67 +130,14 @@ class MysqlJdbcEncryption( return clientCertKeyStorePair } - fun parseSSLConfig(): Map { - var caCertKeyStorePair: Pair? - var clientCertKeyStorePair: Pair? - val additionalParameters: MutableMap = mutableMapOf() - - additionalParameters[SSL_MODE] = sslMode.name.lowercase() - - caCertKeyStorePair = prepareCACertificateKeyStore() - - if (null != caCertKeyStorePair) { - log.debug { "uri for ca cert keystore: ${caCertKeyStorePair.first}" } - try { - additionalParameters.putAll( - mapOf( - TRUST_KEY_STORE_URL to caCertKeyStorePair.first.toURL().toString(), - TRUST_KEY_STORE_PASS to caCertKeyStorePair.second, - TRUST_KEY_STORE_TYPE to KEY_STORE_TYPE_PKCS12 - ) - ) - } catch (e: MalformedURLException) { - throw ConfigErrorException("Unable to get a URL for trust key store") - } - - clientCertKeyStorePair = prepareClientCertificateKeyStore() - - if (null != clientCertKeyStorePair) { - log.debug { - "uri for client cert keystore: ${clientCertKeyStorePair.first} / ${clientCertKeyStorePair.second}" - } - try { - additionalParameters.putAll( - mapOf( - CLIENT_KEY_STORE_URL to clientCertKeyStorePair.first.toURL().toString(), - CLIENT_KEY_STORE_PASS to clientCertKeyStorePair.second, - CLIENT_KEY_STORE_TYPE to KEY_STORE_TYPE_PKCS12 - ) - ) - } catch (e: MalformedURLException) { - throw ConfigErrorException("Unable to get a URL for client key store") - } - } - } - return additionalParameters - } -} - -/** - * Enum representing the SSL mode for MySQL connections. The actual jdbc property name is the lower - * case of the enum name. - */ -enum class SSLMode() { - PREFERRED, - REQUIRED, - VERIFY_CA, - VERIFY_IDENTITY; - companion object { - - fun fromJdbcPropertyName(jdbcPropertyName: String): SSLMode { - return SSLMode.values().find { it.name.equals(jdbcPropertyName, ignoreCase = true) } - ?: throw SystemErrorException("Unknown SSL mode: $jdbcPropertyName") - } + const val TRUST_KEY_STORE_URL: String = "trustCertificateKeyStoreUrl" + const val TRUST_KEY_STORE_PASS: String = "trustCertificateKeyStorePassword" + const val CLIENT_KEY_STORE_URL: String = "clientCertificateKeyStoreUrl" + const val CLIENT_KEY_STORE_PASS: String = "clientCertificateKeyStorePassword" + const val CLIENT_KEY_STORE_TYPE: String = "clientCertificateKeyStoreType" + const val TRUST_KEY_STORE_TYPE: String = "trustCertificateKeyStoreType" + const val KEY_STORE_TYPE_PKCS12: String = "PKCS12" + const val SSL_MODE: String = "sslMode" } } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartition.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartition.kt similarity index 85% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartition.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartition.kt index 2e9191cc37c4..ecb94853c552 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartition.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartition.kt @@ -37,7 +37,7 @@ import io.airbyte.cdk.read.optimize import io.airbyte.cdk.util.Jsons /** Base class for default implementations of [JdbcPartition] for non resumable partitions. */ -sealed class MysqlJdbcPartition( +sealed class MySqlSourceJdbcPartition( val selectQueryGenerator: SelectQueryGenerator, streamState: DefaultJdbcStreamState, ) : JdbcPartition { @@ -62,29 +62,29 @@ sealed class MysqlJdbcPartition( } /** Default implementation of a [JdbcPartition] for an unsplittable snapshot partition. */ -class MysqlJdbcNonResumableSnapshotPartition( +class MySqlSourceJdbcNonResumableSnapshotPartition( selectQueryGenerator: SelectQueryGenerator, override val streamState: DefaultJdbcStreamState, -) : MysqlJdbcPartition(selectQueryGenerator, streamState) { +) : MySqlSourceJdbcPartition(selectQueryGenerator, streamState) { - override val completeState: OpaqueStateValue = MysqlJdbcStreamStateValue.snapshotCompleted + override val completeState: OpaqueStateValue = MySqlSourceJdbcStreamStateValue.snapshotCompleted } /** * Default implementation of a [JdbcPartition] for an non resumable snapshot partition preceding a * cursor-based incremental sync. */ -class MysqlJdbcNonResumableSnapshotWithCursorPartition( +class MySqlSourceJdbcNonResumableSnapshotWithCursorPartition( selectQueryGenerator: SelectQueryGenerator, override val streamState: DefaultJdbcStreamState, val cursor: Field, ) : - MysqlJdbcPartition(selectQueryGenerator, streamState), + MySqlSourceJdbcPartition(selectQueryGenerator, streamState), JdbcCursorPartition { override val completeState: OpaqueStateValue get() = - MysqlJdbcStreamStateValue.cursorIncrementalCheckpoint( + MySqlSourceJdbcStreamStateValue.cursorIncrementalCheckpoint( cursor, cursorCheckpoint = streamState.cursorUpperBound!!, streamState.stream, @@ -97,12 +97,12 @@ class MysqlJdbcNonResumableSnapshotWithCursorPartition( } /** Base class for default implementations of [JdbcPartition] for partitions. */ -sealed class MysqlJdbcResumablePartition( +sealed class MySqlSourceJdbcResumablePartition( selectQueryGenerator: SelectQueryGenerator, streamState: DefaultJdbcStreamState, val checkpointColumns: List, ) : - MysqlJdbcPartition(selectQueryGenerator, streamState), + MySqlSourceJdbcPartition(selectQueryGenerator, streamState), JdbcSplittablePartition { abstract val lowerBound: List? abstract val upperBound: List? @@ -179,49 +179,49 @@ sealed class MysqlJdbcResumablePartition( } /** RFR for cursor based read. */ -class MysqlJdbcRfrSnapshotPartition( +class MySqlSourceJdbcRfrSnapshotPartition( selectQueryGenerator: SelectQueryGenerator, override val streamState: DefaultJdbcStreamState, primaryKey: List, override val lowerBound: List?, override val upperBound: List?, -) : MysqlJdbcResumablePartition(selectQueryGenerator, streamState, primaryKey) { +) : MySqlSourceJdbcResumablePartition(selectQueryGenerator, streamState, primaryKey) { // TODO: this needs to reflect lastRecord. Complete state needs to have last primary key value // in RFR case. override val completeState: OpaqueStateValue get() = - MysqlJdbcStreamStateValue.snapshotCheckpoint( + MySqlSourceJdbcStreamStateValue.snapshotCheckpoint( primaryKey = checkpointColumns, primaryKeyCheckpoint = checkpointColumns.map { upperBound?.get(0) ?: Jsons.nullNode() }, ) override fun incompleteState(lastRecord: ObjectNode): OpaqueStateValue = - MysqlJdbcStreamStateValue.snapshotCheckpoint( + MySqlSourceJdbcStreamStateValue.snapshotCheckpoint( primaryKey = checkpointColumns, primaryKeyCheckpoint = checkpointColumns.map { lastRecord[it.id] ?: Jsons.nullNode() }, ) } /** RFR for CDC. */ -class MysqlJdbcCdcRfrSnapshotPartition( +class MySqlSourceJdbcCdcRfrSnapshotPartition( selectQueryGenerator: SelectQueryGenerator, override val streamState: DefaultJdbcStreamState, primaryKey: List, override val lowerBound: List?, override val upperBound: List?, -) : MysqlJdbcResumablePartition(selectQueryGenerator, streamState, primaryKey) { +) : MySqlSourceJdbcResumablePartition(selectQueryGenerator, streamState, primaryKey) { override val completeState: OpaqueStateValue get() = - MysqlCdcInitialSnapshotStateValue.snapshotCheckpoint( + MySqlSourceCdcInitialSnapshotStateValue.snapshotCheckpoint( primaryKey = checkpointColumns, primaryKeyCheckpoint = checkpointColumns.map { upperBound?.get(0) ?: Jsons.nullNode() }, ) override fun incompleteState(lastRecord: ObjectNode): OpaqueStateValue = - MysqlCdcInitialSnapshotStateValue.snapshotCheckpoint( + MySqlSourceCdcInitialSnapshotStateValue.snapshotCheckpoint( primaryKey = checkpointColumns, primaryKeyCheckpoint = checkpointColumns.map { lastRecord[it.id] ?: Jsons.nullNode() }, ) @@ -231,18 +231,18 @@ class MysqlJdbcCdcRfrSnapshotPartition( * Implementation of a [JdbcPartition] for a CDC snapshot partition. Used for incremental CDC * initial sync. */ -class MysqlJdbcCdcSnapshotPartition( +class MySqlSourceJdbcCdcSnapshotPartition( selectQueryGenerator: SelectQueryGenerator, override val streamState: DefaultJdbcStreamState, primaryKey: List, override val lowerBound: List? -) : MysqlJdbcResumablePartition(selectQueryGenerator, streamState, primaryKey) { +) : MySqlSourceJdbcResumablePartition(selectQueryGenerator, streamState, primaryKey) { override val upperBound: List? = null override val completeState: OpaqueStateValue - get() = MysqlCdcInitialSnapshotStateValue.getSnapshotCompletedState(stream) + get() = MySqlSourceCdcInitialSnapshotStateValue.getSnapshotCompletedState(stream) override fun incompleteState(lastRecord: ObjectNode): OpaqueStateValue = - MysqlCdcInitialSnapshotStateValue.snapshotCheckpoint( + MySqlSourceCdcInitialSnapshotStateValue.snapshotCheckpoint( primaryKey = checkpointColumns, primaryKeyCheckpoint = checkpointColumns.map { lastRecord[it.id] ?: Jsons.nullNode() }, ) @@ -251,14 +251,14 @@ class MysqlJdbcCdcSnapshotPartition( /** * Default implementation of a [JdbcPartition] for a splittable partition involving cursor columns. */ -sealed class MysqlJdbcCursorPartition( +sealed class MySqlSourceJdbcCursorPartition( selectQueryGenerator: SelectQueryGenerator, streamState: DefaultJdbcStreamState, checkpointColumns: List, val cursor: Field, private val explicitCursorUpperBound: JsonNode?, ) : - MysqlJdbcResumablePartition(selectQueryGenerator, streamState, checkpointColumns), + MySqlSourceJdbcResumablePartition(selectQueryGenerator, streamState, checkpointColumns), JdbcCursorPartition { val cursorUpperBound: JsonNode @@ -274,7 +274,7 @@ sealed class MysqlJdbcCursorPartition( * Default implementation of a [JdbcPartition] for a splittable snapshot partition preceding a * cursor-based incremental sync. */ -class MysqlJdbcSnapshotWithCursorPartition( +class MySqlSourceJdbcSnapshotWithCursorPartition( selectQueryGenerator: SelectQueryGenerator, override val streamState: DefaultJdbcStreamState, primaryKey: List, @@ -282,7 +282,7 @@ class MysqlJdbcSnapshotWithCursorPartition( cursor: Field, cursorUpperBound: JsonNode?, ) : - MysqlJdbcCursorPartition( + MySqlSourceJdbcCursorPartition( selectQueryGenerator, streamState, primaryKey, @@ -294,14 +294,14 @@ class MysqlJdbcSnapshotWithCursorPartition( override val completeState: OpaqueStateValue get() = - MysqlJdbcStreamStateValue.cursorIncrementalCheckpoint( + MySqlSourceJdbcStreamStateValue.cursorIncrementalCheckpoint( cursor, cursorUpperBound, stream, ) override fun incompleteState(lastRecord: ObjectNode): OpaqueStateValue = - MysqlJdbcStreamStateValue.snapshotWithCursorCheckpoint( + MySqlSourceJdbcStreamStateValue.snapshotWithCursorCheckpoint( primaryKey = checkpointColumns, primaryKeyCheckpoint = checkpointColumns.map { lastRecord[it.id] ?: Jsons.nullNode() }, cursor, @@ -313,7 +313,7 @@ class MysqlJdbcSnapshotWithCursorPartition( * Default implementation of a [JdbcPartition] for a cursor incremental partition. These are always * splittable. */ -class MysqlJdbcCursorIncrementalPartition( +class MySqlSourceJdbcCursorIncrementalPartition( selectQueryGenerator: SelectQueryGenerator, override val streamState: DefaultJdbcStreamState, cursor: Field, @@ -321,7 +321,7 @@ class MysqlJdbcCursorIncrementalPartition( override val isLowerBoundIncluded: Boolean, cursorUpperBound: JsonNode?, ) : - MysqlJdbcCursorPartition( + MySqlSourceJdbcCursorPartition( selectQueryGenerator, streamState, listOf(cursor), @@ -335,14 +335,14 @@ class MysqlJdbcCursorIncrementalPartition( override val completeState: OpaqueStateValue get() = - MysqlJdbcStreamStateValue.cursorIncrementalCheckpoint( + MySqlSourceJdbcStreamStateValue.cursorIncrementalCheckpoint( cursor, cursorCheckpoint = cursorUpperBound, stream, ) override fun incompleteState(lastRecord: ObjectNode): OpaqueStateValue = - MysqlJdbcStreamStateValue.cursorIncrementalCheckpoint( + MySqlSourceJdbcStreamStateValue.cursorIncrementalCheckpoint( cursor, cursorCheckpoint = lastRecord[cursor.id] ?: Jsons.nullNode(), stream, diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartitionFactory.kt similarity index 90% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartitionFactory.kt index cc94ea2840af..b40ffc3e6331 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartitionFactory.kt @@ -39,15 +39,15 @@ import java.util.concurrent.ConcurrentHashMap @Primary @Singleton -class MysqlJdbcPartitionFactory( +class MySqlSourceJdbcPartitionFactory( override val sharedState: DefaultJdbcSharedState, - val selectQueryGenerator: MysqlSourceOperations, - val config: MysqlSourceConfiguration, + val selectQueryGenerator: MySqlSourceOperations, + val config: MySqlSourceConfiguration, ) : JdbcPartitionFactory< DefaultJdbcSharedState, DefaultJdbcStreamState, - MysqlJdbcPartition, + MySqlSourceJdbcPartition, > { private val streamStates = ConcurrentHashMap() @@ -78,13 +78,13 @@ class MysqlJdbcPartitionFactory( } } - private fun coldStart(streamState: DefaultJdbcStreamState): MysqlJdbcPartition { + private fun coldStart(streamState: DefaultJdbcStreamState): MySqlSourceJdbcPartition { val stream: Stream = streamState.stream val pkChosenFromCatalog: List = stream.configuredPrimaryKey ?: listOf() if (stream.configuredSyncMode == ConfiguredSyncMode.FULL_REFRESH) { if (pkChosenFromCatalog.isEmpty()) { - return MysqlJdbcNonResumableSnapshotPartition( + return MySqlSourceJdbcNonResumableSnapshotPartition( selectQueryGenerator, streamState, ) @@ -92,7 +92,7 @@ class MysqlJdbcPartitionFactory( val upperBound = findPkUpperBound(stream, pkChosenFromCatalog) if (sharedState.configuration.global) { - return MysqlJdbcCdcRfrSnapshotPartition( + return MySqlSourceJdbcCdcRfrSnapshotPartition( selectQueryGenerator, streamState, pkChosenFromCatalog, @@ -100,7 +100,7 @@ class MysqlJdbcPartitionFactory( upperBound = listOf(upperBound) ) } else { - return MysqlJdbcRfrSnapshotPartition( + return MySqlSourceJdbcRfrSnapshotPartition( selectQueryGenerator, streamState, pkChosenFromCatalog, @@ -111,7 +111,7 @@ class MysqlJdbcPartitionFactory( } if (sharedState.configuration.global) { - return MysqlJdbcCdcSnapshotPartition( + return MySqlSourceJdbcCdcSnapshotPartition( selectQueryGenerator, streamState, pkChosenFromCatalog, @@ -123,13 +123,13 @@ class MysqlJdbcPartitionFactory( stream.configuredCursor as? Field ?: throw ConfigErrorException("no cursor") if (pkChosenFromCatalog.isEmpty()) { - return MysqlJdbcNonResumableSnapshotWithCursorPartition( + return MySqlSourceJdbcNonResumableSnapshotWithCursorPartition( selectQueryGenerator, streamState, cursorChosenFromCatalog ) } - return MysqlJdbcSnapshotWithCursorPartition( + return MySqlSourceJdbcSnapshotWithCursorPartition( selectQueryGenerator, streamState, pkChosenFromCatalog, @@ -157,7 +157,7 @@ class MysqlJdbcPartitionFactory( * ii. In cursor read phase, use cursor incremental. * ``` */ - override fun create(streamFeedBootstrap: StreamFeedBootstrap): MysqlJdbcPartition? { + override fun create(streamFeedBootstrap: StreamFeedBootstrap): MySqlSourceJdbcPartition? { val stream: Stream = streamFeedBootstrap.feed val streamState: DefaultJdbcStreamState = streamState(streamFeedBootstrap) @@ -187,19 +187,22 @@ class MysqlJdbcPartitionFactory( ) { if ( streamState.streamFeedBootstrap.currentState == - MysqlJdbcStreamStateValue.snapshotCompleted + MySqlSourceJdbcStreamStateValue.snapshotCompleted ) { return null } - return MysqlJdbcNonResumableSnapshotPartition( + return MySqlSourceJdbcNonResumableSnapshotPartition( selectQueryGenerator, streamState, ) } if (!isCursorBased) { - val sv: MysqlCdcInitialSnapshotStateValue = - Jsons.treeToValue(opaqueStateValue, MysqlCdcInitialSnapshotStateValue::class.java) + val sv: MySqlSourceCdcInitialSnapshotStateValue = + Jsons.treeToValue( + opaqueStateValue, + MySqlSourceCdcInitialSnapshotStateValue::class.java + ) if (stream.configuredSyncMode == ConfiguredSyncMode.FULL_REFRESH) { val upperBound = findPkUpperBound(stream, pkChosenFromCatalog) @@ -208,7 +211,7 @@ class MysqlJdbcPartitionFactory( } val pkLowerBound: JsonNode = stateValueToJsonNode(pkChosenFromCatalog[0], sv.pkVal) - return MysqlJdbcRfrSnapshotPartition( + return MySqlSourceJdbcRfrSnapshotPartition( selectQueryGenerator, streamState, pkChosenFromCatalog, @@ -233,7 +236,7 @@ class MysqlJdbcPartitionFactory( if (sv.pkVal == upperBound.asText()) { return null } - return MysqlJdbcCdcRfrSnapshotPartition( + return MySqlSourceJdbcCdcRfrSnapshotPartition( selectQueryGenerator, streamState, pkChosenFromCatalog, @@ -241,7 +244,7 @@ class MysqlJdbcPartitionFactory( upperBound = listOf(upperBound) ) } - return MysqlJdbcCdcSnapshotPartition( + return MySqlSourceJdbcCdcSnapshotPartition( selectQueryGenerator, streamState, pkChosenFromCatalog, @@ -249,8 +252,8 @@ class MysqlJdbcPartitionFactory( ) } } else { - val sv: MysqlJdbcStreamStateValue = - Jsons.treeToValue(opaqueStateValue, MysqlJdbcStreamStateValue::class.java) + val sv: MySqlSourceJdbcStreamStateValue = + Jsons.treeToValue(opaqueStateValue, MySqlSourceJdbcStreamStateValue::class.java) if (stream.configuredSyncMode == ConfiguredSyncMode.FULL_REFRESH) { val upperBound = findPkUpperBound(stream, pkChosenFromCatalog) @@ -260,7 +263,7 @@ class MysqlJdbcPartitionFactory( val pkLowerBound: JsonNode = stateValueToJsonNode(pkChosenFromCatalog[0], sv.pkValue) - return MysqlJdbcCdcRfrSnapshotPartition( + return MySqlSourceJdbcCdcRfrSnapshotPartition( selectQueryGenerator, streamState, pkChosenFromCatalog, @@ -279,7 +282,7 @@ class MysqlJdbcPartitionFactory( stream.configuredCursor as? Field ?: throw ConfigErrorException("no cursor") // in a state where it's still in primary_key read part. - return MysqlJdbcSnapshotWithCursorPartition( + return MySqlSourceJdbcSnapshotWithCursorPartition( selectQueryGenerator, streamState, pkChosenFromCatalog, @@ -302,7 +305,7 @@ class MysqlJdbcPartitionFactory( // Incremental complete. return null } - return MysqlJdbcCursorIncrementalPartition( + return MySqlSourceJdbcCursorIncrementalPartition( selectQueryGenerator, streamState, cursor, @@ -391,9 +394,9 @@ class MysqlJdbcPartitionFactory( } override fun split( - unsplitPartition: MysqlJdbcPartition, + unsplitPartition: MySqlSourceJdbcPartition, opaqueStateValues: List - ): List { + ): List { // At this moment we don't support split on within mysql stream in any mode. return listOf(unsplitPartition) } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcStreamStateValue.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcStreamStateValue.kt similarity index 84% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcStreamStateValue.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcStreamStateValue.kt index 68499f2fc206..53f755a06a5e 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcStreamStateValue.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcStreamStateValue.kt @@ -11,10 +11,10 @@ import io.airbyte.cdk.discover.Field import io.airbyte.cdk.read.Stream import io.airbyte.cdk.util.Jsons -data class MysqlJdbcStreamStateValue( +data class MySqlSourceJdbcStreamStateValue( @JsonProperty("cursor") val cursors: String? = null, @JsonProperty("version") val version: Int = 2, - @JsonProperty("state_type") val stateType: String = StateType.CURSOR_BASED.stateType, + @JsonProperty("state_type") val stateType: String = StateType.CURSOR_BASED.serialized, @JsonProperty("stream_name") val streamName: String = "", @JsonProperty("cursor_field") val cursorField: List = listOf(), @JsonProperty("stream_namespace") val streamNamespace: String = "", @@ -26,7 +26,7 @@ data class MysqlJdbcStreamStateValue( companion object { /** Value representing the completion of a FULL_REFRESH snapshot. */ val snapshotCompleted: OpaqueStateValue - get() = Jsons.valueToTree(MysqlJdbcStreamStateValue(stateType = "primary_key")) + get() = Jsons.valueToTree(MySqlSourceJdbcStreamStateValue(stateType = "primary_key")) /** Value representing the progress of an ongoing incremental cursor read. */ fun cursorIncrementalCheckpoint( @@ -38,7 +38,7 @@ data class MysqlJdbcStreamStateValue( true -> Jsons.nullNode() false -> Jsons.valueToTree( - MysqlJdbcStreamStateValue( + MySqlSourceJdbcStreamStateValue( cursorField = listOf(cursor.id), cursors = cursorCheckpoint.asText(), streamName = stream.name, @@ -58,10 +58,10 @@ data class MysqlJdbcStreamStateValue( true -> Jsons.nullNode() false -> Jsons.valueToTree( - MysqlJdbcStreamStateValue( + MySqlSourceJdbcStreamStateValue( pkName = primaryKeyField.id, pkValue = primaryKeyCheckpoint.first().asText(), - stateType = StateType.PRIMARY_KEY.stateType, + stateType = StateType.PRIMARY_KEY.serialized, ) ) } @@ -79,13 +79,13 @@ data class MysqlJdbcStreamStateValue( true -> Jsons.nullNode() false -> Jsons.valueToTree( - MysqlJdbcStreamStateValue( + MySqlSourceJdbcStreamStateValue( pkName = primaryKeyField.id, pkValue = primaryKeyCheckpoint.first().asText(), - stateType = StateType.PRIMARY_KEY.stateType, + stateType = StateType.PRIMARY_KEY.serialized, incrementalState = Jsons.valueToTree( - MysqlJdbcStreamStateValue( + MySqlSourceJdbcStreamStateValue( cursorField = listOf(cursor.id), streamName = stream.name, streamNamespace = stream.namespace!! @@ -96,9 +96,12 @@ data class MysqlJdbcStreamStateValue( } } } -} -enum class StateType(val stateType: String) { - PRIMARY_KEY("primary_key"), - CURSOR_BASED("cursor_based"), + enum class StateType { + PRIMARY_KEY, + CURSOR_BASED, + ; + + val serialized: String = name.lowercase() + } } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceMetadataQuerier.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceMetadataQuerier.kt similarity index 95% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceMetadataQuerier.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceMetadataQuerier.kt index 1130fb8c26d5..54e0a2d6f146 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceMetadataQuerier.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceMetadataQuerier.kt @@ -24,7 +24,7 @@ import java.sql.Statement private val log = KotlinLogging.logger {} /** Delegates to [JdbcMetadataQuerier] except for [fields]. */ -class MysqlSourceMetadataQuerier( +class MySqlSourceMetadataQuerier( val base: JdbcMetadataQuerier, ) : MetadataQuerier by base { @@ -116,7 +116,7 @@ class MysqlSourceMetadataQuerier( val results = mutableListOf() val schemas: List = streamNamespaces() val sql: String = PK_QUERY_FMTSTR.format(schemas.joinToString { "\'$it\'" }) - log.info { "Querying Mysql system tables for all primary keys for catalog discovery." } + log.info { "Querying MySQL system tables for all primary keys for catalog discovery." } try { // Get primary keys for the specified table base.conn.createStatement().use { stmt: Statement -> @@ -134,7 +134,7 @@ class MysqlSourceMetadataQuerier( } } } - log.info { "Discovered all primary keys in ${schemas.size} Mysql schema(s)." } + log.info { "Discovered all primary keys in ${schemas.size} MySQL schema(s)." } return@lazy results .groupBy { findTableName( @@ -163,7 +163,7 @@ class MysqlSourceMetadataQuerier( } .toMap() } catch (e: Exception) { - throw RuntimeException("Mysql primary key discovery query failed: ${e.message}", e) + throw RuntimeException("MySQL primary key discovery query failed: ${e.message}", e) } } @@ -200,7 +200,7 @@ class MysqlSourceMetadataQuerier( """ } - /** Mysql implementation of [MetadataQuerier.Factory]. */ + /** MySQL implementation of [MetadataQuerier.Factory]. */ @Singleton @Primary class Factory( @@ -208,9 +208,9 @@ class MysqlSourceMetadataQuerier( val selectQueryGenerator: SelectQueryGenerator, val fieldTypeMapper: JdbcMetadataQuerier.FieldTypeMapper, val checkQueries: JdbcCheckQueries, - ) : MetadataQuerier.Factory { + ) : MetadataQuerier.Factory { /** The [SourceConfiguration] is deliberately not injected in order to support tests. */ - override fun session(config: MysqlSourceConfiguration): MetadataQuerier { + override fun session(config: MySqlSourceConfiguration): MetadataQuerier { val jdbcConnectionFactory = JdbcConnectionFactory(config) val base = JdbcMetadataQuerier( @@ -221,7 +221,7 @@ class MysqlSourceMetadataQuerier( checkQueries, jdbcConnectionFactory, ) - return MysqlSourceMetadataQuerier(base) + return MySqlSourceMetadataQuerier(base) } } } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceOperations.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceOperations.kt similarity index 93% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceOperations.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceOperations.kt index 93e6467f2be4..f1231f5093ce 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceOperations.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceOperations.kt @@ -65,26 +65,24 @@ import io.airbyte.cdk.read.WhereClauseNode import io.airbyte.cdk.read.WhereNode import io.airbyte.cdk.read.cdc.DebeziumState import io.airbyte.cdk.util.Jsons -import io.airbyte.integrations.source.mysql.cdc.MySqlDebeziumOperations -import io.airbyte.integrations.source.mysql.cdc.MySqlPosition import io.micronaut.context.annotation.Primary import jakarta.inject.Singleton import java.time.OffsetDateTime @Singleton @Primary -class MysqlSourceOperations : +class MySqlSourceOperations : JdbcMetadataQuerier.FieldTypeMapper, SelectQueryGenerator, JdbcAirbyteStreamFactory { - override val globalCursor: MetaField = MysqlCdcMetaFields.CDC_CURSOR + override val globalCursor: MetaField = MySqlSourceCdcMetaFields.CDC_CURSOR override val globalMetaFields: Set = setOf( - MysqlCdcMetaFields.CDC_CURSOR, + MySqlSourceCdcMetaFields.CDC_CURSOR, CommonMetaField.CDC_UPDATED_AT, CommonMetaField.CDC_DELETED_AT, - MysqlCdcMetaFields.CDC_LOG_FILE, - MysqlCdcMetaFields.CDC_LOG_POS + MySqlSourceCdcMetaFields.CDC_LOG_FILE, + MySqlSourceCdcMetaFields.CDC_LOG_POS ) override fun decorateRecordData( @@ -98,21 +96,22 @@ class MysqlSourceOperations : CdcOffsetDateTimeMetaFieldType.jsonEncoder.encode(timestamp), ) recordData.set( - MysqlCdcMetaFields.CDC_LOG_POS.id, + MySqlSourceCdcMetaFields.CDC_LOG_POS.id, CdcIntegerMetaFieldType.jsonEncoder.encode(0), ) if (globalStateValue == null) { return } val debeziumState: DebeziumState = - MySqlDebeziumOperations.deserializeDebeziumState(globalStateValue) - val position: MySqlPosition = MySqlDebeziumOperations.position(debeziumState.offset) + MySqlSourceDebeziumOperations.deserializeDebeziumState(globalStateValue) + val position: MySqlSourceCdcPosition = + MySqlSourceDebeziumOperations.position(debeziumState.offset) recordData.set( - MysqlCdcMetaFields.CDC_LOG_FILE.id, + MySqlSourceCdcMetaFields.CDC_LOG_FILE.id, CdcStringMetaFieldType.jsonEncoder.encode(position.fileName), ) recordData.set( - MysqlCdcMetaFields.CDC_LOG_POS.id, + MySqlSourceCdcMetaFields.CDC_LOG_POS.id, CdcIntegerMetaFieldType.jsonEncoder.encode(position.position), ) } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSelectQuerier.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceSelectQuerier.kt similarity index 90% rename from airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSelectQuerier.kt rename to airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceSelectQuerier.kt index d311c1abb8d7..7617437e6b22 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSelectQuerier.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceSelectQuerier.kt @@ -15,7 +15,7 @@ import jakarta.inject.Singleton /** Mysql implementation of [JdbcSelectQuerier], which sets fetch size differently */ @Singleton @Primary -class MysqlSelectQuerier( +class MySqlSourceSelectQuerier( private val jdbcConnectionFactory: JdbcConnectionFactory, ) : SelectQuerier by JdbcSelectQuerier(jdbcConnectionFactory) { private val log = KotlinLogging.logger {} @@ -23,15 +23,15 @@ class MysqlSelectQuerier( override fun executeQuery( q: SelectQuery, parameters: SelectQuerier.Parameters, - ): SelectQuerier.Result = MysqlResult(jdbcConnectionFactory, q, parameters) + ): SelectQuerier.Result = MySqlResult(jdbcConnectionFactory, q, parameters) - inner class MysqlResult( + inner class MySqlResult( jdbcConnectionFactory: JdbcConnectionFactory, q: SelectQuery, parameters: SelectQuerier.Parameters, ) : JdbcSelectQuerier.Result(jdbcConnectionFactory, q, parameters) { /** - * Mysql does things differently with fetch size. Setting fetch size on a result set is + * MySQL does things differently with fetch size. Setting fetch size on a result set is * safer than on a statement. */ override fun initQueryExecution() { diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlContainerFactory.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlContainerFactory.kt similarity index 83% rename from airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlContainerFactory.kt rename to airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlContainerFactory.kt index 14f3e4ab555a..d67a632935c6 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlContainerFactory.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlContainerFactory.kt @@ -8,7 +8,7 @@ import org.testcontainers.containers.MySQLContainer import org.testcontainers.containers.Network import org.testcontainers.utility.DockerImageName -object MysqlContainerFactory { +object MySqlContainerFactory { const val COMPATIBLE_NAME = "mysql:8.0" private val log = KotlinLogging.logger {} @@ -16,16 +16,16 @@ object MysqlContainerFactory { TestContainerFactory.register(COMPATIBLE_NAME, ::MySQLContainer) } - sealed interface MysqlContainerModifier : + sealed interface MySqlContainerModifier : TestContainerFactory.ContainerModifier> - data object WithNetwork : MysqlContainerModifier { + data object WithNetwork : MySqlContainerModifier { override fun modify(container: MySQLContainer<*>) { container.withNetwork(Network.newNetwork()) } } - data object WithCdc : MysqlContainerModifier { + data object WithCdc : MySqlContainerModifier { override fun modify(container: MySQLContainer<*>) { container.start() container.execAsRoot(GTID_ON) @@ -44,7 +44,7 @@ object MysqlContainerFactory { "ON *.* TO '%s'@'%%';" } - data object WithCdcOff : MysqlContainerModifier { + data object WithCdcOff : MySqlContainerModifier { override fun modify(container: MySQLContainer<*>) { container.withCommand("--skip-log-bin") } @@ -52,7 +52,7 @@ object MysqlContainerFactory { fun exclusive( imageName: String, - vararg modifiers: MysqlContainerModifier, + vararg modifiers: MySqlContainerModifier, ): MySQLContainer<*> { val dockerImageName = DockerImageName.parse(imageName).asCompatibleSubstituteFor(COMPATIBLE_NAME) @@ -61,7 +61,7 @@ object MysqlContainerFactory { fun shared( imageName: String, - vararg modifiers: MysqlContainerModifier, + vararg modifiers: MySqlContainerModifier, ): MySQLContainer<*> { val dockerImageName = DockerImageName.parse(imageName).asCompatibleSubstituteFor(COMPATIBLE_NAME) @@ -69,8 +69,8 @@ object MysqlContainerFactory { } @JvmStatic - fun config(mySQLContainer: MySQLContainer<*>): MysqlSourceConfigurationSpecification = - MysqlSourceConfigurationSpecification().apply { + fun config(mySQLContainer: MySQLContainer<*>): MySqlSourceConfigurationSpecification = + MySqlSourceConfigurationSpecification().apply { host = mySQLContainer.host port = mySQLContainer.getMappedPort(MySQLContainer.MYSQL_PORT) username = mySQLContainer.username @@ -82,10 +82,6 @@ object MysqlContainerFactory { setMethodValue(UserDefinedCursor) } - @JvmStatic - fun cdcConfig(mySQLContainer: MySQLContainer<*>): MysqlSourceConfigurationSpecification = - config(mySQLContainer).also { it.setMethodValue(CdcCursor()) } - fun MySQLContainer<*>.execAsRoot(sql: String) { val cleanSql: String = sql.trim().removeSuffix(";") + ";" log.info { "Executing SQL as root: $cleanSql" } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcIntegrationTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcIntegrationTest.kt similarity index 88% rename from airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcIntegrationTest.kt rename to airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcIntegrationTest.kt index e74856b3be50..f172df4d01d6 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcIntegrationTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcIntegrationTest.kt @@ -12,7 +12,7 @@ import io.airbyte.cdk.jdbc.IntFieldType import io.airbyte.cdk.jdbc.JdbcConnectionFactory import io.airbyte.cdk.jdbc.StringFieldType import io.airbyte.cdk.output.BufferingOutputConsumer -import io.airbyte.integrations.source.mysql.MysqlContainerFactory.execAsRoot +import io.airbyte.integrations.source.mysql.MySqlContainerFactory.execAsRoot import io.airbyte.protocol.models.v0.AirbyteConnectionStatus import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteStateMessage @@ -31,7 +31,7 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout import org.testcontainers.containers.MySQLContainer -class MysqlCdcIntegrationTest { +class MySqlSourceCdcIntegrationTest { @Test fun testCheck() { @@ -43,19 +43,19 @@ class MysqlCdcIntegrationTest { AirbyteConnectionStatus.Status.SUCCEEDED ) - MysqlContainerFactory.exclusive( + MySqlContainerFactory.exclusive( imageName = "mysql:8.0", - MysqlContainerFactory.WithCdcOff, + MySqlContainerFactory.WithCdcOff, ) .use { nonCdcDbContainer -> { - val invalidConfig: MysqlSourceConfigurationSpecification = - MysqlContainerFactory.config(nonCdcDbContainer).apply { + val invalidConfig: MySqlSourceConfigurationSpecification = + MySqlContainerFactory.config(nonCdcDbContainer).apply { setMethodValue(CdcCursor()) } val nonCdcConnectionFactory = - JdbcConnectionFactory(MysqlSourceConfigurationFactory().make(invalidConfig)) + JdbcConnectionFactory(MySqlSourceConfigurationFactory().make(invalidConfig)) provisionTestContainer(nonCdcDbContainer, nonCdcConnectionFactory) @@ -108,11 +108,11 @@ class MysqlCdcIntegrationTest { val log = KotlinLogging.logger {} lateinit var dbContainer: MySQLContainer<*> - fun config(): MysqlSourceConfigurationSpecification = - MysqlContainerFactory.config(dbContainer).apply { setMethodValue(CdcCursor()) } + fun config(): MySqlSourceConfigurationSpecification = + MySqlContainerFactory.config(dbContainer).apply { setMethodValue(CdcCursor()) } val connectionFactory: JdbcConnectionFactory by lazy { - JdbcConnectionFactory(MysqlSourceConfigurationFactory().make(config())) + JdbcConnectionFactory(MySqlSourceConfigurationFactory().make(config())) } val configuredCatalog: ConfiguredAirbyteCatalog = run { @@ -123,12 +123,12 @@ class MysqlCdcIntegrationTest { columns = listOf(Field("k", IntFieldType), Field("v", StringFieldType)), primaryKeyColumnIDs = listOf(listOf("k")), ) - val stream: AirbyteStream = MysqlSourceOperations().createGlobal(discoveredStream) + val stream: AirbyteStream = MySqlSourceOperations().createGlobal(discoveredStream) val configuredStream: ConfiguredAirbyteStream = CatalogHelpers.toDefaultConfiguredStream(stream) .withSyncMode(SyncMode.INCREMENTAL) .withPrimaryKey(discoveredStream.primaryKeyColumnIDs) - .withCursorField(listOf(MysqlCdcMetaFields.CDC_CURSOR.id)) + .withCursorField(listOf(MySqlSourceCdcMetaFields.CDC_CURSOR.id)) ConfiguredAirbyteCatalog().withStreams(listOf(configuredStream)) } @@ -137,9 +137,9 @@ class MysqlCdcIntegrationTest { @Timeout(value = 300) fun startAndProvisionTestContainer() { dbContainer = - MysqlContainerFactory.exclusive( + MySqlContainerFactory.exclusive( imageName = "mysql:8.0", - MysqlContainerFactory.WithNetwork, + MySqlContainerFactory.WithNetwork, ) provisionTestContainer(dbContainer, connectionFactory) } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfigurationSpecificationTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfigurationSpecificationTest.kt similarity index 90% rename from airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfigurationSpecificationTest.kt rename to airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfigurationSpecificationTest.kt index ee38781a8307..51e41a392652 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfigurationSpecificationTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfigurationSpecificationTest.kt @@ -13,9 +13,9 @@ import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test @MicronautTest(environments = [Environment.TEST], rebuildContext = true) -class MysqlSourceConfigurationSpecificationTestTest { +class MySqlSourceConfigurationSpecificationTest { @Inject - lateinit var supplier: ConfigurationSpecificationSupplier + lateinit var supplier: ConfigurationSpecificationSupplier @Test fun testSchemaViolation() { @@ -25,7 +25,7 @@ class MysqlSourceConfigurationSpecificationTestTest { @Test @Property(name = "airbyte.connector.config.json", value = CONFIG_JSON) fun testJson() { - val pojo: MysqlSourceConfigurationSpecification = supplier.get() + val pojo: MySqlSourceConfigurationSpecification = supplier.get() Assertions.assertEquals("localhost", pojo.host) Assertions.assertEquals(12345, pojo.port) Assertions.assertEquals("FOO", pojo.username) @@ -41,10 +41,11 @@ class MysqlSourceConfigurationSpecificationTestTest { Assertions.assertEquals(60, pojo.checkpointTargetIntervalSeconds) Assertions.assertEquals(2, pojo.concurrency) } -} -const val CONFIG_JSON: String = - """ + companion object { + + const val CONFIG_JSON: String = + """ { "host": "localhost", "port": 12345, @@ -69,3 +70,5 @@ const val CONFIG_JSON: String = "concurrency": 2 } """ + } +} diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfigurationTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfigurationTest.kt similarity index 81% rename from airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfigurationTest.kt rename to airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfigurationTest.kt index 98fd548677fa..66294e7acb1f 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceConfigurationTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfigurationTest.kt @@ -15,14 +15,14 @@ import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test @MicronautTest(environments = [Environment.TEST, AIRBYTE_CLOUD_ENV], rebuildContext = true) -class MysqlSourceConfigurationTest { +class MySqlSourceConfigurationTest { @Inject lateinit var pojoSupplier: - ConfigurationSpecificationSupplier + ConfigurationSpecificationSupplier @Inject lateinit var factory: - SourceConfigurationFactory + SourceConfigurationFactory @Test @Property(name = "airbyte.connector.config.host", value = "localhost") @@ -36,7 +36,7 @@ class MysqlSourceConfigurationTest { value = "theAnswerToLiveAndEverything=42&sessionVariables=max_execution_time=10000&foo=bar&" ) fun testParseJdbcParameters() { - val pojo: MysqlSourceConfigurationSpecification = pojoSupplier.get() + val pojo: MySqlSourceConfigurationSpecification = pojoSupplier.get() val config = factory.makeWithoutExceptionHandling(pojo) @@ -63,7 +63,7 @@ class MysqlSourceConfigurationTest { @Property(name = "airbyte.connector.config.password", value = "BAR") @Property(name = "airbyte.connector.config.database", value = "SYSTEM") fun testAirbyteCloudDeployment() { - val pojo: MysqlSourceConfigurationSpecification = pojoSupplier.get() + val pojo: MySqlSourceConfigurationSpecification = pojoSupplier.get() Assertions.assertThrows(ConfigErrorException::class.java) { factory.makeWithoutExceptionHandling(pojo) } @@ -72,7 +72,7 @@ class MysqlSourceConfigurationTest { @Test @Property(name = "airbyte.connector.config.json", value = CONFIG_V1) fun testParseConfigFromV1() { - val pojo: MysqlSourceConfigurationSpecification = pojoSupplier.get() + val pojo: MySqlSourceConfigurationSpecification = pojoSupplier.get() val config = factory.makeWithoutExceptionHandling(pojo) @@ -96,37 +96,11 @@ class MysqlSourceConfigurationTest { Assertions.assertTrue(config.sshTunnel is SshNoTunnelMethod) } -} -const val CONFIG: String = - """ -{ - "host": "localhost", - "port": 12345, - "username": "FOO", - "password": "BAR", - "database": "SYSTEM", - "ssl_mode": { - "mode": "preferred" - }, - "tunnel_method": { - "tunnel_method": "SSH_PASSWORD_AUTH", - "tunnel_host": "localhost", - "tunnel_port": 2222, - "tunnel_user": "sshuser", - "tunnel_user_password": "***" - }, - "replication_method": { - "method": "STANDARD" - }, - "checkpoint_target_interval_seconds": 60, - "jdbc_url_params": "theAnswerToLiveAndEverything=42&sessionVariables=max_execution_time=10000&foo=bar&", - "concurrency": 2 -} -""" + companion object { -const val CONFIG_V1: String = - """ + const val CONFIG_V1: String = + """ { "host": "localhost", "port": 12345, @@ -147,3 +121,5 @@ const val CONFIG_V1: String = } } """ + } +} diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlCursorBasedIntegrationTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCursorBasedIntegrationTest.kt similarity index 96% rename from airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlCursorBasedIntegrationTest.kt rename to airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCursorBasedIntegrationTest.kt index 2447c49adf5f..800e5a9d2786 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlCursorBasedIntegrationTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCursorBasedIntegrationTest.kt @@ -32,7 +32,7 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout import org.testcontainers.containers.MySQLContainer -class MysqlCursorBasedIntegrationTest { +class MySqlSourceCursorBasedIntegrationTest { @BeforeEach fun resetTable() { @@ -161,13 +161,13 @@ class MysqlCursorBasedIntegrationTest { companion object { val log = KotlinLogging.logger {} - val dbContainer: MySQLContainer<*> = MysqlContainerFactory.shared(imageName = "mysql:8.0") + val dbContainer: MySQLContainer<*> = MySqlContainerFactory.shared(imageName = "mysql:8.0") - val config: MysqlSourceConfigurationSpecification = - MysqlContainerFactory.config(dbContainer) + val config: MySqlSourceConfigurationSpecification = + MySqlContainerFactory.config(dbContainer) val connectionFactory: JdbcConnectionFactory by lazy { - JdbcConnectionFactory(MysqlSourceConfigurationFactory().make(config)) + JdbcConnectionFactory(MySqlSourceConfigurationFactory().make(config)) } fun getConfiguredCatalog(): ConfiguredAirbyteCatalog { @@ -178,7 +178,7 @@ class MysqlCursorBasedIntegrationTest { columns = listOf(Field("k", IntFieldType), Field("v", StringFieldType)), primaryKeyColumnIDs = listOf(listOf("k")), ) - val stream: AirbyteStream = MysqlSourceOperations().createGlobal(discoveredStream) + val stream: AirbyteStream = MySqlSourceOperations().createGlobal(discoveredStream) val configuredStream: ConfiguredAirbyteStream = CatalogHelpers.toDefaultConfiguredStream(stream) .withSyncMode(SyncMode.INCREMENTAL) diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlDatatypeIntegrationTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDatatypeIntegrationTest.kt similarity index 76% rename from airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlDatatypeIntegrationTest.kt rename to airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDatatypeIntegrationTest.kt index b68e89cbe94b..247ac22f3956 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlDatatypeIntegrationTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDatatypeIntegrationTest.kt @@ -19,12 +19,12 @@ import org.junit.jupiter.api.TestFactory import org.junit.jupiter.api.Timeout import org.testcontainers.containers.MySQLContainer -class MySqlDatatypeIntegrationTest { +class MySqlSourceDatatypeIntegrationTest { @TestFactory @Timeout(300) fun syncTests(): Iterable = - DynamicDatatypeTestFactory(MySqlDatatypeTestOperations).build(dbContainer) + DynamicDatatypeTestFactory(MySqlSourceDatatypeTestOperations).build(dbContainer) companion object { @@ -34,38 +34,38 @@ class MySqlDatatypeIntegrationTest { @BeforeAll @Timeout(value = 300) fun startAndProvisionTestContainer() { - dbContainer = MysqlContainerFactory.shared("mysql:8.0", MysqlContainerFactory.WithCdc) + dbContainer = MySqlContainerFactory.shared("mysql:8.0", MySqlContainerFactory.WithCdc) } } } -object MySqlDatatypeTestOperations : +object MySqlSourceDatatypeTestOperations : DatatypeTestOperations< MySQLContainer<*>, - MysqlSourceConfigurationSpecification, - MysqlSourceConfiguration, - MysqlSourceConfigurationFactory, - MySqlDatatypeTestCase + MySqlSourceConfigurationSpecification, + MySqlSourceConfiguration, + MySqlSourceConfigurationFactory, + MySqlSourceDatatypeTestCase > { private val log = KotlinLogging.logger {} override val withGlobal: Boolean = true - override val globalCursorMetaField: MetaField = MysqlCdcMetaFields.CDC_CURSOR + override val globalCursorMetaField: MetaField = MySqlSourceCdcMetaFields.CDC_CURSOR override fun streamConfigSpec( container: MySQLContainer<*> - ): MysqlSourceConfigurationSpecification = - MysqlContainerFactory.config(container).also { it.setMethodValue(UserDefinedCursor) } + ): MySqlSourceConfigurationSpecification = + MySqlContainerFactory.config(container).also { it.setMethodValue(UserDefinedCursor) } override fun globalConfigSpec( container: MySQLContainer<*> - ): MysqlSourceConfigurationSpecification = - MysqlContainerFactory.config(container).also { it.setMethodValue(CdcCursor()) } + ): MySqlSourceConfigurationSpecification = + MySqlContainerFactory.config(container).also { it.setMethodValue(CdcCursor()) } - override val configFactory: MysqlSourceConfigurationFactory = MysqlSourceConfigurationFactory() + override val configFactory: MySqlSourceConfigurationFactory = MySqlSourceConfigurationFactory() - override fun createStreams(config: MysqlSourceConfiguration) { + override fun createStreams(config: MySqlSourceConfiguration) { JdbcConnectionFactory(config).get().use { connection: Connection -> connection.isReadOnly = false connection.createStatement().use { it.execute("CREATE DATABASE IF NOT EXISTS test") } @@ -79,7 +79,7 @@ object MySqlDatatypeTestOperations : } } - override fun populateStreams(config: MysqlSourceConfiguration) { + override fun populateStreams(config: MySqlSourceConfiguration) { JdbcConnectionFactory(config).get().use { connection: Connection -> connection.isReadOnly = false connection.createStatement().use { it.execute("USE test") } @@ -202,144 +202,149 @@ object MySqlDatatypeTestOperations : "X'89504E470D0A1A0A0000000D49484452'" to """"iVBORw0KGgoAAAANSUhEUg=="""", ) - override val testCases: Map = + override val testCases: Map = listOf( - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "BOOLEAN", booleanValues, LeafAirbyteSchemaType.BOOLEAN, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "VARCHAR(10)", stringValues, LeafAirbyteSchemaType.STRING, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "DECIMAL(10,2)", decimalValues, LeafAirbyteSchemaType.NUMBER, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "DECIMAL(10,2) UNSIGNED", decimalValues, LeafAirbyteSchemaType.NUMBER, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "DECIMAL UNSIGNED", zeroPrecisionDecimalValues, LeafAirbyteSchemaType.INTEGER, ), - MySqlDatatypeTestCase("FLOAT", floatValues, LeafAirbyteSchemaType.NUMBER), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase("FLOAT", floatValues, LeafAirbyteSchemaType.NUMBER), + MySqlSourceDatatypeTestCase( + "FLOAT(34)", + floatValues, + LeafAirbyteSchemaType.NUMBER, + ), + MySqlSourceDatatypeTestCase( "FLOAT(7,4)", floatValues, LeafAirbyteSchemaType.NUMBER, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "FLOAT(53,8)", floatValues, LeafAirbyteSchemaType.NUMBER, ), - MySqlDatatypeTestCase("DOUBLE", decimalValues, LeafAirbyteSchemaType.NUMBER), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase("DOUBLE", decimalValues, LeafAirbyteSchemaType.NUMBER), + MySqlSourceDatatypeTestCase( "DOUBLE UNSIGNED", decimalValues, LeafAirbyteSchemaType.NUMBER, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "TINYINT", tinyintValues, LeafAirbyteSchemaType.INTEGER, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "TINYINT UNSIGNED", tinyintValues, LeafAirbyteSchemaType.INTEGER, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "SMALLINT", tinyintValues, LeafAirbyteSchemaType.INTEGER, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "MEDIUMINT", tinyintValues, LeafAirbyteSchemaType.INTEGER, ), - MySqlDatatypeTestCase("BIGINT", intValues, LeafAirbyteSchemaType.INTEGER), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase("BIGINT", intValues, LeafAirbyteSchemaType.INTEGER), + MySqlSourceDatatypeTestCase( "SMALLINT UNSIGNED", tinyintValues, LeafAirbyteSchemaType.INTEGER, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "MEDIUMINT UNSIGNED", tinyintValues, LeafAirbyteSchemaType.INTEGER, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "BIGINT UNSIGNED", intValues, LeafAirbyteSchemaType.INTEGER, ), - MySqlDatatypeTestCase("INT", intValues, LeafAirbyteSchemaType.INTEGER), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase("INT", intValues, LeafAirbyteSchemaType.INTEGER), + MySqlSourceDatatypeTestCase( "INT UNSIGNED", intValues, LeafAirbyteSchemaType.INTEGER, ), - MySqlDatatypeTestCase("DATE", dateValues, LeafAirbyteSchemaType.DATE), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase("DATE", dateValues, LeafAirbyteSchemaType.DATE), + MySqlSourceDatatypeTestCase( "TIMESTAMP", timestampValues, LeafAirbyteSchemaType.TIMESTAMP_WITH_TIMEZONE, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "DATETIME", dateTimeValues, LeafAirbyteSchemaType.TIMESTAMP_WITHOUT_TIMEZONE, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "TIME", timeValues, LeafAirbyteSchemaType.TIME_WITHOUT_TIMEZONE, ), - MySqlDatatypeTestCase("YEAR", yearValues, LeafAirbyteSchemaType.INTEGER), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase("YEAR", yearValues, LeafAirbyteSchemaType.INTEGER), + MySqlSourceDatatypeTestCase( "VARBINARY(255)", binaryValues, LeafAirbyteSchemaType.BINARY, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "BIT", bitValues, LeafAirbyteSchemaType.BOOLEAN, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "BIT(8)", longBitValues, LeafAirbyteSchemaType.INTEGER, isGlobal = false, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "BIT(8)", longBitCdcValues, LeafAirbyteSchemaType.INTEGER, isStream = false, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "JSON", jsonValues, LeafAirbyteSchemaType.STRING, isGlobal = false, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "JSON", jsonCdcValues, LeafAirbyteSchemaType.STRING, isStream = false, ), - MySqlDatatypeTestCase( + MySqlSourceDatatypeTestCase( "ENUM('a', 'b', 'c')", enumValues, LeafAirbyteSchemaType.STRING, @@ -348,7 +353,7 @@ object MySqlDatatypeTestOperations : .associateBy { it.id } } -data class MySqlDatatypeTestCase( +data class MySqlSourceDatatypeTestCase( val sqlType: String, val sqlToAirbyte: Map, override val expectedAirbyteSchemaType: AirbyteSchemaType, diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactoryTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartitionFactoryTest.kt similarity index 84% rename from airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactoryTest.kt rename to airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartitionFactoryTest.kt index a363e99f42f1..f5ff8be708f4 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactoryTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartitionFactoryTest.kt @@ -32,7 +32,6 @@ import io.airbyte.protocol.models.v0.StreamDescriptor import io.mockk.mockk import java.time.OffsetDateTime import java.util.Base64 -import kotlin.test.assertEquals import kotlin.test.assertNull import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertTrue @@ -40,17 +39,17 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.CsvSource -class MysqlJdbcPartitionFactoryTest { +class MySqlSourceJdbcPartitionFactoryTest { companion object { - private val selectQueryGenerator = MysqlSourceOperations() + private val selectQueryGenerator = MySqlSourceOperations() private val sharedState = sharedState() private val cdcSharedState = sharedState(global = true) - private val config = mockk() + private val config = mockk() - val mysqlJdbcPartitionFactory = - MysqlJdbcPartitionFactory(sharedState, selectQueryGenerator, config) + val mySqlSourceJdbcPartitionFactory = + MySqlSourceJdbcPartitionFactory(sharedState, selectQueryGenerator, config) val mysqlCdcJdbcPartitionFactory = - MysqlJdbcPartitionFactory(cdcSharedState, selectQueryGenerator, config) + MySqlSourceJdbcPartitionFactory(cdcSharedState, selectQueryGenerator, config) val fieldId = Field("id", IntFieldType) val stream = @@ -111,7 +110,7 @@ class MysqlJdbcPartitionFactoryTest { ): DefaultJdbcSharedState { val configSpec = - MysqlSourceConfigurationSpecification().apply { + MySqlSourceConfigurationSpecification().apply { host = "" port = 0 username = "foo" @@ -123,7 +122,7 @@ class MysqlJdbcPartitionFactoryTest { } else { configSpec.setMethodValue(UserDefinedCursor) } - val configFactory = MysqlSourceConfigurationFactory() + val configFactory = MySqlSourceConfigurationFactory() val configuration = configFactory.make(configSpec) val mockSelectQuerier = mockk() @@ -170,14 +169,14 @@ class MysqlJdbcPartitionFactoryTest { @Test fun testColdStartWithPkCursorBased() { - val jdbcPartition = mysqlJdbcPartitionFactory.create(streamFeedBootstrap(stream)) - assertTrue(jdbcPartition is MysqlJdbcSnapshotWithCursorPartition) + val jdbcPartition = mySqlSourceJdbcPartitionFactory.create(streamFeedBootstrap(stream)) + assertTrue(jdbcPartition is MySqlSourceJdbcSnapshotWithCursorPartition) } @Test fun testColdStartWithPkCdc() { val jdbcPartition = mysqlCdcJdbcPartitionFactory.create(streamFeedBootstrap(stream)) - assertTrue(jdbcPartition is MysqlJdbcCdcSnapshotPartition) + assertTrue(jdbcPartition is MySqlSourceJdbcCdcSnapshotPartition) } @Test @@ -193,8 +192,9 @@ class MysqlJdbcPartitionFactoryTest { configuredPrimaryKey = listOf(), configuredCursor = fieldId, ) - val jdbcPartition = mysqlJdbcPartitionFactory.create(streamFeedBootstrap(streamWithoutPk)) - assertTrue(jdbcPartition is MysqlJdbcNonResumableSnapshotWithCursorPartition) + val jdbcPartition = + mySqlSourceJdbcPartitionFactory.create(streamFeedBootstrap(streamWithoutPk)) + assertTrue(jdbcPartition is MySqlSourceJdbcNonResumableSnapshotWithCursorPartition) } @Test @@ -217,8 +217,8 @@ class MysqlJdbcPartitionFactoryTest { ) val jdbcPartition = - mysqlJdbcPartitionFactory.create(streamFeedBootstrap(stream, incomingStateValue)) - assertTrue(jdbcPartition is MysqlJdbcCursorIncrementalPartition) + mySqlSourceJdbcPartitionFactory.create(streamFeedBootstrap(stream, incomingStateValue)) + assertTrue(jdbcPartition is MySqlSourceJdbcCursorIncrementalPartition) } @ParameterizedTest @@ -256,14 +256,14 @@ class MysqlJdbcPartitionFactoryTest { ) val jdbcPartition = - mysqlJdbcPartitionFactory.create( + mySqlSourceJdbcPartitionFactory.create( streamFeedBootstrap(timestampStream, incomingStateValue) ) - assertTrue(jdbcPartition is MysqlJdbcCursorIncrementalPartition) + assertTrue(jdbcPartition is MySqlSourceJdbcCursorIncrementalPartition) assertEquals( - Jsons.valueToTree("$expectedLowerBound"), - (jdbcPartition as MysqlJdbcCursorIncrementalPartition).cursorLowerBound + Jsons.valueToTree(expectedLowerBound), + (jdbcPartition as MySqlSourceJdbcCursorIncrementalPartition).cursorLowerBound ) } @@ -287,14 +287,14 @@ class MysqlJdbcPartitionFactoryTest { ) val jdbcPartition = - mysqlJdbcPartitionFactory.create( + mySqlSourceJdbcPartitionFactory.create( streamFeedBootstrap(datetimeStream, incomingStateValue) ) - assertTrue(jdbcPartition is MysqlJdbcCursorIncrementalPartition) + assertTrue(jdbcPartition is MySqlSourceJdbcCursorIncrementalPartition) assertEquals( Jsons.valueToTree("2024-11-21T11:59:57.123000"), - (jdbcPartition as MysqlJdbcCursorIncrementalPartition).cursorLowerBound + (jdbcPartition as MySqlSourceJdbcCursorIncrementalPartition).cursorLowerBound ) } @@ -314,9 +314,9 @@ class MysqlJdbcPartitionFactoryTest { ) val jdbcPartition = - mysqlJdbcPartitionFactory.create(streamFeedBootstrap(stream, incomingStateValue)) + mySqlSourceJdbcPartitionFactory.create(streamFeedBootstrap(stream, incomingStateValue)) - assertTrue(jdbcPartition is MysqlJdbcSnapshotWithCursorPartition) + assertTrue(jdbcPartition is MySqlSourceJdbcSnapshotWithCursorPartition) } @Test @@ -336,7 +336,7 @@ class MysqlJdbcPartitionFactoryTest { val jdbcPartition = mysqlCdcJdbcPartitionFactory.create(streamFeedBootstrap(stream, incomingStateValue)) - assertTrue(jdbcPartition is MysqlJdbcCdcSnapshotPartition) + assertTrue(jdbcPartition is MySqlSourceJdbcCdcSnapshotPartition) } @Test @@ -377,12 +377,14 @@ class MysqlJdbcPartitionFactoryTest { ) val jdbcPartition = - mysqlJdbcPartitionFactory.create(streamFeedBootstrap(binaryStream, incomingStateValue)) - assertTrue(jdbcPartition is MysqlJdbcCursorIncrementalPartition) + mySqlSourceJdbcPartitionFactory.create( + streamFeedBootstrap(binaryStream, incomingStateValue) + ) + assertTrue(jdbcPartition is MySqlSourceJdbcCursorIncrementalPartition) assertEquals( Jsons.valueToTree(Base64.getDecoder().decode("OQAAAAAAAAAAAAAAAAAAAA==")), - (jdbcPartition as MysqlJdbcCursorIncrementalPartition).cursorLowerBound + (jdbcPartition as MySqlSourceJdbcCursorIncrementalPartition).cursorLowerBound ) } } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceSelectQueryGeneratorTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceSelectQueryGeneratorTest.kt similarity index 97% rename from airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceSelectQueryGeneratorTest.kt rename to airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceSelectQueryGeneratorTest.kt index 14fb19b67917..226ba557234e 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceSelectQueryGeneratorTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceSelectQueryGeneratorTest.kt @@ -27,7 +27,7 @@ import io.airbyte.cdk.util.Jsons import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test -class MysqlSourceSelectQueryGeneratorTest { +class MySqlSourceSelectQueryGeneratorTest { @Test fun testSelectLimit0() { SelectQuerySpec( @@ -139,7 +139,7 @@ class MysqlSourceSelectQueryGeneratorTest { select.columns, bindings.map { SelectQuery.Binding(it.first, it.second) }, ) - val actual: SelectQuery = MysqlSourceOperations().generate(this.optimize()) + val actual: SelectQuery = MySqlSourceOperations().generate(this.optimize()) Assertions.assertEquals(expected, actual) } } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSpecIntegrationTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceSpecIntegrationTest.kt similarity index 87% rename from airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSpecIntegrationTest.kt rename to airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceSpecIntegrationTest.kt index 63a29db649b6..bd77db88b100 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSpecIntegrationTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceSpecIntegrationTest.kt @@ -4,7 +4,7 @@ package io.airbyte.integrations.source.mysql import io.airbyte.cdk.command.SyncsTestFixture import org.junit.jupiter.api.Test -class MysqlSpecIntegrationTest { +class MySqlSourceSpecIntegrationTest { @Test fun testSpec() { SyncsTestFixture.testSpec("expected-spec.json") diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceTestConfigurationFactory.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceTestConfigurationFactory.kt similarity index 73% rename from airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceTestConfigurationFactory.kt rename to airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceTestConfigurationFactory.kt index 60930cb82823..ef6621958f59 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceTestConfigurationFactory.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceTestConfigurationFactory.kt @@ -12,12 +12,12 @@ import java.time.Duration @Singleton @Requires(env = [Environment.TEST]) @Primary -class MysqlSourceTestConfigurationFactory(val featureFlags: Set) : - SourceConfigurationFactory { +class MySqlSourceTestConfigurationFactory(val featureFlags: Set) : + SourceConfigurationFactory { override fun makeWithoutExceptionHandling( - pojo: MysqlSourceConfigurationSpecification, - ): MysqlSourceConfiguration = - MysqlSourceConfigurationFactory(featureFlags) + pojo: MySqlSourceConfigurationSpecification, + ): MySqlSourceConfiguration = + MySqlSourceConfigurationFactory(featureFlags) .makeWithoutExceptionHandling(pojo) .copy( maxConcurrency = 1, diff --git a/airbyte-integrations/connectors/source-mysql/src/test/resources/dummy_config.json b/airbyte-integrations/connectors/source-mysql/src/test/resources/dummy_config.json deleted file mode 100644 index e17733f16b23..000000000000 --- a/airbyte-integrations/connectors/source-mysql/src/test/resources/dummy_config.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "host": "default", - "port": 5555, - "database": "default", - "username": "default", - "replication_method": { "method": "STANDARD" } -} diff --git a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected-spec.json b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected-spec.json index 329b2434bd72..4d5e72c0953c 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected-spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected-spec.json @@ -1,390 +1,390 @@ { - "documentationUrl": "https://docs.airbyte.com/integrations/sources/mysql", "connectionSpecification": { - "type": "object", - "title": "Mysql Source Spec", "$schema": "http://json-schema.org/draft-07/schema#", - "required": ["host", "port", "database", "username", "replication_method"], + "additionalProperties": true, "properties": { - "host": { - "type": "string", - "order": 1, - "title": "Host", - "description": "Hostname of the database." + "check_privileges": { + "default": true, + "description": "When this feature is enabled, during schema discovery the connector will query each table or view individually to check access privileges and inaccessible tables, views, or columns therein will be removed. In large schemas, this might cause schema discovery to take too long, in which case it might be advisable to disable this feature.", + "order": 13, + "title": "Check Table and Column Access Privileges", + "type": "boolean" }, - "port": { - "type": "integer", - "order": 2, - "title": "Port", - "default": 3306, - "maximum": 65536, - "minimum": 0, - "description": "Port of the database." + "checkpoint_target_interval_seconds": { + "default": 300, + "description": "How often (in seconds) a stream should checkpoint, when possible.", + "order": 11, + "title": "Checkpoint Target Time Interval", + "type": "integer" + }, + "concurrency": { + "default": 1, + "description": "Maximum number of concurrent queries to the database.", + "order": 12, + "title": "Concurrency", + "type": "integer" }, "database": { - "type": "string", + "always_show": true, + "description": "The database name.", "order": 6, "title": "Database", - "always_show": true, - "description": "The database name." + "type": "string" + }, + "host": { + "description": "Hostname of the database.", + "order": 1, + "title": "Host", + "type": "string" + }, + "jdbc_url_params": { + "description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).", + "order": 7, + "title": "JDBC URL Params", + "type": "string" }, "password": { - "type": "string", - "order": 5, - "title": "Password", + "airbyte_secret": true, "always_show": true, "description": "The password associated with the username.", - "airbyte_secret": true + "order": 5, + "title": "Password", + "type": "string" + }, + "port": { + "default": 3306, + "description": "Port of the database.", + "maximum": 65536, + "minimum": 0, + "order": 2, + "title": "Port", + "type": "integer" + }, + "replication_method": { + "description": "Configures how data is extracted from the database.", + "display_type": "radio", + "oneOf": [ + { + "additionalProperties": true, + "description": "Incrementally detects new inserts and updates using the cursor column chosen when configuring a connection (e.g. created_at, updated_at).", + "properties": { + "method": { + "default": "STANDARD", + "enum": ["STANDARD"], + "type": "string" + } + }, + "required": ["method"], + "title": "Scan Changes with User Defined Cursor", + "type": "object" + }, + { + "additionalProperties": true, + "description": "Recommended - Incrementally reads new inserts, updates, and deletes using MySQL's change data capture feature. This must be enabled on your database.", + "properties": { + "initial_load_timeout_hours": { + "always_show": true, + "default": 8, + "description": "The amount of time an initial load is allowed to continue for before catching up on CDC logs.", + "max": 24, + "min": 4, + "order": 4, + "title": "Initial Load Timeout in Hours (Advanced)", + "type": "integer" + }, + "initial_waiting_seconds": { + "always_show": true, + "default": 300, + "description": "The amount of time the connector will wait when it launches to determine if there is new data to sync or not. Defaults to 300 seconds. Valid range: 120 seconds to 1200 seconds. Read about initial waiting time.", + "max": 1200, + "min": 120, + "order": 1, + "title": "Initial Waiting Time in Seconds (Advanced)", + "type": "integer" + }, + "invalid_cdc_cursor_position_behavior": { + "always_show": true, + "default": "Fail sync", + "description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.", + "enum": ["Fail sync", "Re-sync data"], + "order": 3, + "title": "Configured server timezone for the MySQL source (Advanced)", + "type": "string" + }, + "method": { + "default": "CDC", + "enum": ["CDC"], + "type": "string" + }, + "server_timezone": { + "always_show": true, + "description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.", + "order": 2, + "title": "Configured server timezone for the MySQL source (Advanced)", + "type": "string" + } + }, + "required": ["method"], + "title": "Read Changes using Change Data Capture (CDC)", + "type": "object" + } + ], + "order": 10, + "title": "Update Method", + "type": "object" }, "ssl_mode": { - "type": "object", + "description": "The encryption method with is used when communicating with the database.", "oneOf": [ { - "type": "object", - "title": "preferred", - "required": ["mode"], + "additionalProperties": true, + "description": "To allow unencrypted communication only when the source doesn't support encryption.", "properties": { "mode": { + "default": "preferred", "enum": ["preferred"], - "type": "string", - "default": "preferred" + "type": "string" } }, - "description": "To allow unencrypted communication only when the source doesn't support encryption.", - "additionalProperties": true + "required": ["mode"], + "title": "preferred", + "type": "object" }, { - "type": "object", - "title": "required", - "required": ["mode"], + "additionalProperties": true, + "description": "To always require encryption. Note: The connection will fail if the source doesn't support encryption.", "properties": { "mode": { + "default": "required", "enum": ["required"], - "type": "string", - "default": "required" + "type": "string" } }, - "description": "To always require encryption. Note: The connection will fail if the source doesn't support encryption.", - "additionalProperties": true + "required": ["mode"], + "title": "required", + "type": "object" }, { - "type": "object", - "title": "verify_ca", - "required": ["mode", "ca_certificate"], + "additionalProperties": true, + "description": "To always require encryption and verify that the source has a valid SSL certificate.", "properties": { - "mode": { - "enum": ["verify_ca"], - "type": "string", - "default": "verify_ca" - }, - "client_key": { - "type": "string", - "title": "Client Key", - "multiline": true, - "description": "Client key (this is not a required field, but if you want to use it, you will need to add the Client certificate as well)", - "airbyte_secret": true - }, "ca_certificate": { - "type": "string", - "title": "CA certificate", - "multiline": true, + "airbyte_secret": true, "description": "CA certificate", - "airbyte_secret": true + "multiline": true, + "title": "CA certificate", + "type": "string" }, "client_certificate": { - "type": "string", + "airbyte_secret": true, + "description": "Client certificate (this is not a required field, but if you want to use it, you will need to add the Client key as well)", + "multiline": true, "title": "Client certificate File", + "type": "string" + }, + "client_key": { + "airbyte_secret": true, + "description": "Client key (this is not a required field, but if you want to use it, you will need to add the Client certificate as well)", "multiline": true, - "description": "Client certificate (this is not a required field, but if you want to use it, you will need to add the Client key as well)", - "airbyte_secret": true + "title": "Client Key", + "type": "string" }, "client_key_password": { - "type": "string", - "title": "Client key password", - "multiline": true, + "airbyte_secret": true, "description": "Password for keystorage. This field is optional. If you do not add it - the password will be generated automatically.", - "airbyte_secret": true + "multiline": true, + "title": "Client key password", + "type": "string" + }, + "mode": { + "default": "verify_ca", + "enum": ["verify_ca"], + "type": "string" } }, - "description": "To always require encryption and verify that the source has a valid SSL certificate.", - "additionalProperties": true + "required": ["mode", "ca_certificate"], + "title": "verify_ca", + "type": "object" }, { - "type": "object", - "title": "verify_identity", - "required": ["mode", "ca_certificate"], + "additionalProperties": true, + "description": "To always require encryption and verify that the source has a valid SSL certificate.", "properties": { - "mode": { - "enum": ["verify_identity"], - "type": "string", - "default": "verify_identity" - }, - "client_key": { - "type": "string", - "title": "Client Key", - "multiline": true, - "description": "Client key (this is not a required field, but if you want to use it, you will need to add the Client certificate as well)", - "airbyte_secret": true - }, "ca_certificate": { - "type": "string", - "title": "CA certificate", - "multiline": true, + "airbyte_secret": true, "description": "CA certificate", - "airbyte_secret": true + "multiline": true, + "title": "CA certificate", + "type": "string" }, "client_certificate": { - "type": "string", + "airbyte_secret": true, + "description": "Client certificate (this is not a required field, but if you want to use it, you will need to add the Client key as well)", + "multiline": true, "title": "Client certificate File", + "type": "string" + }, + "client_key": { + "airbyte_secret": true, + "description": "Client key (this is not a required field, but if you want to use it, you will need to add the Client certificate as well)", "multiline": true, - "description": "Client certificate (this is not a required field, but if you want to use it, you will need to add the Client key as well)", - "airbyte_secret": true + "title": "Client Key", + "type": "string" }, "client_key_password": { - "type": "string", - "title": "Client key password", - "multiline": true, + "airbyte_secret": true, "description": "Password for keystorage. This field is optional. If you do not add it - the password will be generated automatically.", - "airbyte_secret": true + "multiline": true, + "title": "Client key password", + "type": "string" + }, + "mode": { + "default": "verify_identity", + "enum": ["verify_identity"], + "type": "string" } }, - "description": "To always require encryption and verify that the source has a valid SSL certificate.", - "additionalProperties": true + "required": ["mode", "ca_certificate"], + "title": "verify_identity", + "type": "object" } ], "order": 8, "title": "Encryption", - "description": "The encryption method with is used when communicating with the database." - }, - "username": { - "type": "string", - "order": 4, - "title": "User", - "description": "The username which is used to access the database." - }, - "concurrency": { - "type": "integer", - "order": 12, - "title": "Concurrency", - "default": 1, - "description": "Maximum number of concurrent queries to the database." + "type": "object" }, "tunnel_method": { - "type": "object", + "description": "Whether to initiate an SSH tunnel before connecting to the database, and if so, which kind of authentication to use.", "oneOf": [ { - "type": "object", - "title": "No Tunnel", - "required": ["tunnel_method"], + "additionalProperties": true, + "description": "No ssh tunnel needed to connect to database", "properties": { "tunnel_method": { + "default": "NO_TUNNEL", "enum": ["NO_TUNNEL"], - "type": "string", - "default": "NO_TUNNEL" + "type": "string" } }, - "description": "No ssh tunnel needed to connect to database", - "additionalProperties": true + "required": ["tunnel_method"], + "title": "No Tunnel", + "type": "object" }, { - "type": "object", - "title": "SSH Key Authentication", - "required": [ - "tunnel_method", - "tunnel_host", - "tunnel_port", - "tunnel_user", - "ssh_key" - ], + "additionalProperties": true, + "description": "Connect through a jump server tunnel host using username and ssh key", "properties": { "ssh_key": { - "type": "string", + "airbyte_secret": true, + "description": "OS-level user account ssh key credentials in RSA PEM format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )", + "multiline": true, "order": 4, "title": "SSH Private Key", - "multiline": true, - "description": "OS-level user account ssh key credentials in RSA PEM format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )", - "airbyte_secret": true + "type": "string" }, "tunnel_host": { - "type": "string", + "description": "Hostname of the jump server host that allows inbound ssh tunnel.", "order": 1, "title": "SSH Tunnel Jump Server Host", - "description": "Hostname of the jump server host that allows inbound ssh tunnel." + "type": "string" + }, + "tunnel_method": { + "default": "SSH_KEY_AUTH", + "enum": ["SSH_KEY_AUTH"], + "type": "string" }, "tunnel_port": { - "type": "integer", - "order": 2, - "title": "SSH Connection Port", "default": 22, + "description": "Port on the proxy/jump server that accepts inbound ssh connections.", "maximum": 65536, "minimum": 0, - "description": "Port on the proxy/jump server that accepts inbound ssh connections." + "order": 2, + "title": "SSH Connection Port", + "type": "integer" }, "tunnel_user": { - "type": "string", + "description": "OS-level username for logging into the jump server host", "order": 3, "title": "SSH Login Username", - "description": "OS-level username for logging into the jump server host" - }, - "tunnel_method": { - "enum": ["SSH_KEY_AUTH"], - "type": "string", - "default": "SSH_KEY_AUTH" + "type": "string" } }, - "description": "Connect through a jump server tunnel host using username and ssh key", - "additionalProperties": true - }, - { - "type": "object", - "title": "Password Authentication", "required": [ "tunnel_method", "tunnel_host", "tunnel_port", "tunnel_user", - "tunnel_user_password" + "ssh_key" ], + "title": "SSH Key Authentication", + "type": "object" + }, + { + "additionalProperties": true, + "description": "Connect through a jump server tunnel host using username and password authentication", "properties": { "tunnel_host": { - "type": "string", + "description": "Hostname of the jump server host that allows inbound ssh tunnel.", "order": 1, "title": "SSH Tunnel Jump Server Host", - "description": "Hostname of the jump server host that allows inbound ssh tunnel." + "type": "string" + }, + "tunnel_method": { + "default": "SSH_PASSWORD_AUTH", + "enum": ["SSH_PASSWORD_AUTH"], + "type": "string" }, "tunnel_port": { - "type": "integer", - "order": 2, - "title": "SSH Connection Port", "default": 22, + "description": "Port on the proxy/jump server that accepts inbound ssh connections.", "maximum": 65536, "minimum": 0, - "description": "Port on the proxy/jump server that accepts inbound ssh connections." + "order": 2, + "title": "SSH Connection Port", + "type": "integer" }, "tunnel_user": { - "type": "string", + "description": "OS-level username for logging into the jump server host", "order": 3, "title": "SSH Login Username", - "description": "OS-level username for logging into the jump server host" - }, - "tunnel_method": { - "enum": ["SSH_PASSWORD_AUTH"], - "type": "string", - "default": "SSH_PASSWORD_AUTH" + "type": "string" }, "tunnel_user_password": { - "type": "string", + "airbyte_secret": true, + "description": "OS-level password for logging into the jump server host", "order": 4, "title": "Password", - "description": "OS-level password for logging into the jump server host", - "airbyte_secret": true + "type": "string" } }, - "description": "Connect through a jump server tunnel host using username and password authentication", - "additionalProperties": true + "required": [ + "tunnel_method", + "tunnel_host", + "tunnel_port", + "tunnel_user", + "tunnel_user_password" + ], + "title": "Password Authentication", + "type": "object" } ], "order": 9, "title": "SSH Tunnel Method", - "description": "Whether to initiate an SSH tunnel before connecting to the database, and if so, which kind of authentication to use." - }, - "jdbc_url_params": { - "type": "string", - "order": 7, - "title": "JDBC URL Params", - "description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3)." - }, - "check_privileges": { - "type": "boolean", - "order": 13, - "title": "Check Table and Column Access Privileges", - "default": true, - "description": "When this feature is enabled, during schema discovery the connector will query each table or view individually to check access privileges and inaccessible tables, views, or columns therein will be removed. In large schemas, this might cause schema discovery to take too long, in which case it might be advisable to disable this feature." + "type": "object" }, - "replication_method": { - "type": "object", - "oneOf": [ - { - "type": "object", - "title": "Scan Changes with User Defined Cursor", - "required": ["method"], - "properties": { - "method": { - "enum": ["STANDARD"], - "type": "string", - "default": "STANDARD" - } - }, - "description": "Incrementally detects new inserts and updates using the cursor column chosen when configuring a connection (e.g. created_at, updated_at).", - "additionalProperties": true - }, - { - "type": "object", - "title": "Read Changes using Change Data Capture (CDC)", - "required": ["method"], - "properties": { - "method": { - "enum": ["CDC"], - "type": "string", - "default": "CDC" - }, - "server_timezone": { - "type": "string", - "order": 2, - "title": "Configured server timezone for the MySQL source (Advanced)", - "always_show": true, - "description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard." - }, - "initial_waiting_seconds": { - "max": 1200, - "min": 120, - "type": "integer", - "order": 1, - "title": "Initial Waiting Time in Seconds (Advanced)", - "default": 300, - "always_show": true, - "description": "The amount of time the connector will wait when it launches to determine if there is new data to sync or not. Defaults to 300 seconds. Valid range: 120 seconds to 1200 seconds. Read about initial waiting time." - }, - "initial_load_timeout_hours": { - "max": 24, - "min": 4, - "type": "integer", - "order": 4, - "title": "Initial Load Timeout in Hours (Advanced)", - "default": 8, - "always_show": true, - "description": "The amount of time an initial load is allowed to continue for before catching up on CDC logs." - }, - "invalid_cdc_cursor_position_behavior": { - "enum": ["Fail sync", "Re-sync data"], - "default": "Fail sync", - "type": "string", - "order": 3, - "title": "Configured server timezone for the MySQL source (Advanced)", - "always_show": true, - "description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard." - } - }, - "description": "Recommended - Incrementally reads new inserts, updates, and deletes using Mysql's change data capture feature. This must be enabled on your database.", - "additionalProperties": true - } - ], - "order": 10, - "title": "Update Method", - "description": "Configures how data is extracted from the database.", - "display_type": "radio" - }, - "checkpoint_target_interval_seconds": { - "type": "integer", - "order": 11, - "title": "Checkpoint Target Time Interval", - "default": 300, - "description": "How often (in seconds) a stream should checkpoint, when possible." + "username": { + "description": "The username which is used to access the database.", + "order": 4, + "title": "User", + "type": "string" } }, - "additionalProperties": true + "required": ["host", "port", "database", "username", "replication_method"], + "title": "MySQL Source Spec", + "type": "object" }, - "supportsNormalization": false, + "documentationUrl": "https://docs.airbyte.com/integrations/sources/mysql", + "supported_destination_sync_modes": [], "supportsDBT": false, - "supported_destination_sync_modes": [] + "supportsNormalization": false } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json deleted file mode 100644 index b76358180e65..000000000000 --- a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json +++ /dev/null @@ -1,343 +0,0 @@ -{ - "documentationUrl": "https://docs.airbyte.com/integrations/sources/mysql", - "connectionSpecification": { - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "MySql Source Spec", - "type": "object", - "required": ["host", "port", "database", "username", "replication_method"], - "properties": { - "host": { - "description": "The host name of the database.", - "title": "Host", - "type": "string", - "order": 0 - }, - "port": { - "description": "The port to connect to.", - "title": "Port", - "type": "integer", - "minimum": 0, - "maximum": 65536, - "default": 3306, - "examples": ["3306"], - "order": 1 - }, - "database": { - "description": "The database name.", - "title": "Database", - "type": "string", - "order": 2 - }, - "username": { - "description": "The username which is used to access the database.", - "title": "Username", - "type": "string", - "order": 3 - }, - "password": { - "description": "The password associated with the username.", - "title": "Password", - "type": "string", - "airbyte_secret": true, - "order": 4, - "always_show": true - }, - "jdbc_url_params": { - "description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3). For more information read about JDBC URL parameters.", - "title": "JDBC URL Parameters (Advanced)", - "type": "string", - "order": 5 - }, - "ssl_mode": { - "title": "SSL modes", - "description": "SSL connection modes. Read more in the docs.", - "type": "object", - "order": 7, - "oneOf": [ - { - "title": "preferred", - "description": "Automatically attempt SSL connection. If the MySQL server does not support SSL, continue with a regular connection.", - "required": ["mode"], - "properties": { - "mode": { "type": "string", "const": "preferred", "order": 0 } - } - }, - { - "title": "required", - "description": "Always connect with SSL. If the MySQL server doesn’t support SSL, the connection will not be established. Certificate Authority (CA) and Hostname are not verified.", - "required": ["mode"], - "properties": { - "mode": { "type": "string", "const": "required", "order": 0 } - } - }, - { - "title": "Verify CA", - "description": "Always connect with SSL. Verifies CA, but allows connection even if Hostname does not match.", - "required": ["mode", "ca_certificate"], - "properties": { - "mode": { "type": "string", "const": "verify_ca", "order": 0 }, - "ca_certificate": { - "type": "string", - "title": "CA certificate", - "description": "CA certificate", - "airbyte_secret": true, - "multiline": true, - "order": 1 - }, - "client_certificate": { - "type": "string", - "title": "Client certificate", - "description": "Client certificate (this is not a required field, but if you want to use it, you will need to add the Client key as well)", - "airbyte_secret": true, - "multiline": true, - "order": 2, - "always_show": true - }, - "client_key": { - "type": "string", - "title": "Client key", - "description": "Client key (this is not a required field, but if you want to use it, you will need to add the Client certificate as well)", - "airbyte_secret": true, - "multiline": true, - "order": 3, - "always_show": true - }, - "client_key_password": { - "type": "string", - "title": "Client key password", - "description": "Password for keystorage. This field is optional. If you do not add it - the password will be generated automatically.", - "airbyte_secret": true, - "order": 4 - } - } - }, - { - "title": "Verify Identity", - "description": "Always connect with SSL. Verify both CA and Hostname.", - "required": ["mode", "ca_certificate"], - "properties": { - "mode": { - "type": "string", - "const": "verify_identity", - "order": 0 - }, - "ca_certificate": { - "type": "string", - "title": "CA certificate", - "description": "CA certificate", - "airbyte_secret": true, - "multiline": true, - "order": 1 - }, - "client_certificate": { - "type": "string", - "title": "Client certificate", - "description": "Client certificate (this is not a required field, but if you want to use it, you will need to add the Client key as well)", - "airbyte_secret": true, - "multiline": true, - "order": 2, - "always_show": true - }, - "client_key": { - "type": "string", - "title": "Client key", - "description": "Client key (this is not a required field, but if you want to use it, you will need to add the Client certificate as well)", - "airbyte_secret": true, - "multiline": true, - "order": 3, - "always_show": true - }, - "client_key_password": { - "type": "string", - "title": "Client key password", - "description": "Password for keystorage. This field is optional. If you do not add it - the password will be generated automatically.", - "airbyte_secret": true, - "order": 4 - } - } - } - ], - "default": "required" - }, - "replication_method": { - "type": "object", - "title": "Update Method", - "description": "Configures how data is extracted from the database.", - "order": 8, - "default": "CDC", - "display_type": "radio", - "oneOf": [ - { - "title": "Read Changes using Binary Log (CDC)", - "description": "Recommended - Incrementally reads new inserts, updates, and deletes using the MySQL binary log. This must be enabled on your database.", - "required": ["method"], - "properties": { - "method": { "type": "string", "const": "CDC", "order": 0 }, - "initial_waiting_seconds": { - "type": "integer", - "title": "Initial Waiting Time in Seconds (Advanced)", - "description": "The amount of time the connector will wait when it launches to determine if there is new data to sync or not. Defaults to 300 seconds. Valid range: 120 seconds to 1200 seconds. Read about initial waiting time.", - "default": 300, - "min": 120, - "max": 1200, - "order": 1, - "always_show": true - }, - "server_time_zone": { - "type": "string", - "title": "Configured server timezone for the MySQL source (Advanced)", - "description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.", - "order": 2, - "always_show": true - }, - "invalid_cdc_cursor_position_behavior": { - "type": "string", - "title": "Invalid CDC position behavior (Advanced)", - "description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.", - "enum": ["Fail sync", "Re-sync data"], - "default": "Fail sync", - "order": 3, - "always_show": true - }, - "initial_load_timeout_hours": { - "type": "integer", - "title": "Initial Load Timeout in Hours (Advanced)", - "description": "The amount of time an initial load is allowed to continue for before catching up on CDC logs.", - "default": 8, - "min": 4, - "max": 24, - "order": 4, - "always_show": true - } - } - }, - { - "title": "Scan Changes with User Defined Cursor", - "description": "Incrementally detects new inserts and updates using the cursor column chosen when configuring a connection (e.g. created_at, updated_at).", - "required": ["method"], - "properties": { - "method": { "type": "string", "const": "STANDARD", "order": 0 } - } - } - ] - }, - "tunnel_method": { - "type": "object", - "title": "SSH Tunnel Method", - "description": "Whether to initiate an SSH tunnel before connecting to the database, and if so, which kind of authentication to use.", - "oneOf": [ - { - "title": "No Tunnel", - "required": ["tunnel_method"], - "properties": { - "tunnel_method": { - "description": "No ssh tunnel needed to connect to database", - "type": "string", - "const": "NO_TUNNEL", - "order": 0 - } - } - }, - { - "title": "SSH Key Authentication", - "required": [ - "tunnel_method", - "tunnel_host", - "tunnel_port", - "tunnel_user", - "ssh_key" - ], - "properties": { - "tunnel_method": { - "description": "Connect through a jump server tunnel host using username and ssh key", - "type": "string", - "const": "SSH_KEY_AUTH", - "order": 0 - }, - "tunnel_host": { - "title": "SSH Tunnel Jump Server Host", - "description": "Hostname of the jump server host that allows inbound ssh tunnel.", - "type": "string", - "order": 1 - }, - "tunnel_port": { - "title": "SSH Connection Port", - "description": "Port on the proxy/jump server that accepts inbound ssh connections.", - "type": "integer", - "minimum": 0, - "maximum": 65536, - "default": 22, - "examples": ["22"], - "order": 2 - }, - "tunnel_user": { - "title": "SSH Login Username", - "description": "OS-level username for logging into the jump server host.", - "type": "string", - "order": 3 - }, - "ssh_key": { - "title": "SSH Private Key", - "description": "OS-level user account ssh key credentials in RSA PEM format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )", - "type": "string", - "airbyte_secret": true, - "multiline": true, - "order": 4 - } - } - }, - { - "title": "Password Authentication", - "required": [ - "tunnel_method", - "tunnel_host", - "tunnel_port", - "tunnel_user", - "tunnel_user_password" - ], - "properties": { - "tunnel_method": { - "description": "Connect through a jump server tunnel host using username and password authentication", - "type": "string", - "const": "SSH_PASSWORD_AUTH", - "order": 0 - }, - "tunnel_host": { - "title": "SSH Tunnel Jump Server Host", - "description": "Hostname of the jump server host that allows inbound ssh tunnel.", - "type": "string", - "order": 1 - }, - "tunnel_port": { - "title": "SSH Connection Port", - "description": "Port on the proxy/jump server that accepts inbound ssh connections.", - "type": "integer", - "minimum": 0, - "maximum": 65536, - "default": 22, - "examples": ["22"], - "order": 2 - }, - "tunnel_user": { - "title": "SSH Login Username", - "description": "OS-level username for logging into the jump server host", - "type": "string", - "order": 3 - }, - "tunnel_user_password": { - "title": "Password", - "description": "OS-level password for logging into the jump server host", - "type": "string", - "airbyte_secret": true, - "order": 4 - } - } - } - ] - } - } - }, - "supportsNormalization": false, - "supportsDBT": false, - "supported_destination_sync_modes": [] -} diff --git a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json deleted file mode 100644 index d45898990ba5..000000000000 --- a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json +++ /dev/null @@ -1,367 +0,0 @@ -{ - "documentationUrl": "https://docs.airbyte.com/integrations/sources/mysql", - "connectionSpecification": { - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "MySql Source Spec", - "type": "object", - "required": ["host", "port", "database", "username", "replication_method"], - "properties": { - "host": { - "description": "The host name of the database.", - "title": "Host", - "type": "string", - "order": 0 - }, - "port": { - "description": "The port to connect to.", - "title": "Port", - "type": "integer", - "minimum": 0, - "maximum": 65536, - "default": 3306, - "examples": ["3306"], - "order": 1 - }, - "database": { - "description": "The database name.", - "title": "Database", - "type": "string", - "order": 2 - }, - "username": { - "description": "The username which is used to access the database.", - "title": "Username", - "type": "string", - "order": 3 - }, - "password": { - "description": "The password associated with the username.", - "title": "Password", - "type": "string", - "airbyte_secret": true, - "order": 4, - "always_show": true - }, - "jdbc_url_params": { - "description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3). For more information read about JDBC URL parameters.", - "title": "JDBC URL Parameters (Advanced)", - "type": "string", - "order": 5 - }, - "ssl": { - "title": "SSL Connection", - "description": "Encrypt data using SSL.", - "type": "boolean", - "default": true, - "order": 6 - }, - "ssl_mode": { - "title": "SSL modes", - "description": "SSL connection modes. Read more in the docs.", - "type": "object", - "order": 7, - "oneOf": [ - { - "title": "preferred", - "description": "Automatically attempt SSL connection. If the MySQL server does not support SSL, continue with a regular connection.", - "required": ["mode"], - "properties": { - "mode": { - "type": "string", - "const": "preferred", - "order": 0 - } - } - }, - { - "title": "required", - "description": "Always connect with SSL. If the MySQL server doesn’t support SSL, the connection will not be established. Certificate Authority (CA) and Hostname are not verified.", - "required": ["mode"], - "properties": { - "mode": { - "type": "string", - "const": "required", - "order": 0 - } - } - }, - { - "title": "Verify CA", - "description": "Always connect with SSL. Verifies CA, but allows connection even if Hostname does not match.", - "required": ["mode", "ca_certificate"], - "properties": { - "mode": { - "type": "string", - "const": "verify_ca", - "order": 0 - }, - "ca_certificate": { - "type": "string", - "title": "CA certificate", - "description": "CA certificate", - "airbyte_secret": true, - "multiline": true, - "order": 1 - }, - "client_certificate": { - "type": "string", - "title": "Client certificate", - "description": "Client certificate (this is not a required field, but if you want to use it, you will need to add the Client key as well)", - "airbyte_secret": true, - "multiline": true, - "order": 2, - "always_show": true - }, - "client_key": { - "type": "string", - "title": "Client key", - "description": "Client key (this is not a required field, but if you want to use it, you will need to add the Client certificate as well)", - "airbyte_secret": true, - "multiline": true, - "order": 3, - "always_show": true - }, - "client_key_password": { - "type": "string", - "title": "Client key password", - "description": "Password for keystorage. This field is optional. If you do not add it - the password will be generated automatically.", - "airbyte_secret": true, - "order": 4 - } - } - }, - { - "title": "Verify Identity", - "description": "Always connect with SSL. Verify both CA and Hostname.", - "required": ["mode", "ca_certificate"], - "properties": { - "mode": { - "type": "string", - "const": "verify_identity", - "order": 0 - }, - "ca_certificate": { - "type": "string", - "title": "CA certificate", - "description": "CA certificate", - "airbyte_secret": true, - "multiline": true, - "order": 1 - }, - "client_certificate": { - "type": "string", - "title": "Client certificate", - "description": "Client certificate (this is not a required field, but if you want to use it, you will need to add the Client key as well)", - "airbyte_secret": true, - "multiline": true, - "order": 2, - "always_show": true - }, - "client_key": { - "type": "string", - "title": "Client key", - "description": "Client key (this is not a required field, but if you want to use it, you will need to add the Client certificate as well)", - "airbyte_secret": true, - "multiline": true, - "order": 3, - "always_show": true - }, - "client_key_password": { - "type": "string", - "title": "Client key password", - "description": "Password for keystorage. This field is optional. If you do not add it - the password will be generated automatically.", - "airbyte_secret": true, - "order": 4 - } - } - } - ] - }, - "replication_method": { - "type": "object", - "title": "Update Method", - "description": "Configures how data is extracted from the database.", - "order": 8, - "default": "CDC", - "display_type": "radio", - "oneOf": [ - { - "title": "Read Changes using Binary Log (CDC)", - "description": "Recommended - Incrementally reads new inserts, updates, and deletes using the MySQL binary log. This must be enabled on your database.", - "required": ["method"], - "properties": { - "method": { - "type": "string", - "const": "CDC", - "order": 0 - }, - "initial_waiting_seconds": { - "type": "integer", - "title": "Initial Waiting Time in Seconds (Advanced)", - "description": "The amount of time the connector will wait when it launches to determine if there is new data to sync or not. Defaults to 300 seconds. Valid range: 120 seconds to 1200 seconds. Read about initial waiting time.", - "default": 300, - "min": 120, - "max": 1200, - "order": 1, - "always_show": true - }, - "server_time_zone": { - "type": "string", - "title": "Configured server timezone for the MySQL source (Advanced)", - "description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.", - "order": 2, - "always_show": true - }, - "invalid_cdc_cursor_position_behavior": { - "type": "string", - "title": "Invalid CDC position behavior (Advanced)", - "description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.", - "enum": ["Fail sync", "Re-sync data"], - "default": "Fail sync", - "order": 3, - "always_show": true - }, - "initial_load_timeout_hours": { - "type": "integer", - "title": "Initial Load Timeout in Hours (Advanced)", - "description": "The amount of time an initial load is allowed to continue for before catching up on CDC logs.", - "default": 8, - "min": 4, - "max": 24, - "order": 4, - "always_show": true - } - } - }, - { - "title": "Scan Changes with User Defined Cursor", - "description": "Incrementally detects new inserts and updates using the cursor column chosen when configuring a connection (e.g. created_at, updated_at).", - "required": ["method"], - "properties": { - "method": { - "type": "string", - "const": "STANDARD", - "order": 0 - } - } - } - ] - }, - "tunnel_method": { - "type": "object", - "title": "SSH Tunnel Method", - "description": "Whether to initiate an SSH tunnel before connecting to the database, and if so, which kind of authentication to use.", - "oneOf": [ - { - "title": "No Tunnel", - "required": ["tunnel_method"], - "properties": { - "tunnel_method": { - "description": "No ssh tunnel needed to connect to database", - "type": "string", - "const": "NO_TUNNEL", - "order": 0 - } - } - }, - { - "title": "SSH Key Authentication", - "required": [ - "tunnel_method", - "tunnel_host", - "tunnel_port", - "tunnel_user", - "ssh_key" - ], - "properties": { - "tunnel_method": { - "description": "Connect through a jump server tunnel host using username and ssh key", - "type": "string", - "const": "SSH_KEY_AUTH", - "order": 0 - }, - "tunnel_host": { - "title": "SSH Tunnel Jump Server Host", - "description": "Hostname of the jump server host that allows inbound ssh tunnel.", - "type": "string", - "order": 1 - }, - "tunnel_port": { - "title": "SSH Connection Port", - "description": "Port on the proxy/jump server that accepts inbound ssh connections.", - "type": "integer", - "minimum": 0, - "maximum": 65536, - "default": 22, - "examples": ["22"], - "order": 2 - }, - "tunnel_user": { - "title": "SSH Login Username", - "description": "OS-level username for logging into the jump server host.", - "type": "string", - "order": 3 - }, - "ssh_key": { - "title": "SSH Private Key", - "description": "OS-level user account ssh key credentials in RSA PEM format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )", - "type": "string", - "airbyte_secret": true, - "multiline": true, - "order": 4 - } - } - }, - { - "title": "Password Authentication", - "required": [ - "tunnel_method", - "tunnel_host", - "tunnel_port", - "tunnel_user", - "tunnel_user_password" - ], - "properties": { - "tunnel_method": { - "description": "Connect through a jump server tunnel host using username and password authentication", - "type": "string", - "const": "SSH_PASSWORD_AUTH", - "order": 0 - }, - "tunnel_host": { - "title": "SSH Tunnel Jump Server Host", - "description": "Hostname of the jump server host that allows inbound ssh tunnel.", - "type": "string", - "order": 1 - }, - "tunnel_port": { - "title": "SSH Connection Port", - "description": "Port on the proxy/jump server that accepts inbound ssh connections.", - "type": "integer", - "minimum": 0, - "maximum": 65536, - "default": 22, - "examples": ["22"], - "order": 2 - }, - "tunnel_user": { - "title": "SSH Login Username", - "description": "OS-level username for logging into the jump server host", - "type": "string", - "order": 3 - }, - "tunnel_user_password": { - "title": "Password", - "description": "OS-level password for logging into the jump server host", - "type": "string", - "airbyte_secret": true, - "order": 4 - } - } - } - ] - } - } - }, - "supported_destination_sync_modes": [] -} diff --git a/airbyte-integrations/connectors/source-mysql/src/test/resources/test.png b/airbyte-integrations/connectors/source-mysql/src/test/resources/test.png deleted file mode 100644 index ca452bd25e3c..000000000000 Binary files a/airbyte-integrations/connectors/source-mysql/src/test/resources/test.png and /dev/null differ diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 0817f2c8793e..1693a64d4360 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -226,6 +226,7 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:---------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.9.4 | 2024-12-18 | [49939](https://github.com/airbytehq/airbyte/pull/49939) | Pin Bulk CDK version to 226, rename classes. | | 3.9.3 | 2024-12-18 | [49932](https://github.com/airbytehq/airbyte/pull/49932) | Backward compatibility for saved states with timestamp that include timezone offset. | | 3.9.2 | 2024-12-16 | [49830](https://github.com/airbytehq/airbyte/pull/49830) | Fixes an issue with auto generated tinyint columns | | 3.9.1 | 2024-12-12 | [49456](https://github.com/airbytehq/airbyte/pull/49456) | Bump version to re-relase |