From 9677de596bb304d8ba9038a0049e7a5db8d11edf Mon Sep 17 00:00:00 2001 From: Stephane Geneix Date: Wed, 18 Dec 2024 16:36:48 -0800 Subject: [PATCH] passing tests --- .../cdk/read/cdc/CdcPartitionsCreator.kt | 2 +- .../cdk/read/cdc/DebeziumOperations.kt | 2 +- .../read/cdc/CdcPartitionReaderMongoTest.kt | 108 +++++----- .../cdc/CdcPartitionReaderPostgresTest.kt | 110 +++++----- .../cdk/read/cdc/CdcPartitionsCreatorTest.kt | 4 +- .../cdc/AbstractCdcPartitionReaderTest.kt | 197 ++++++++++-------- .../mssql/MsSqlServerDebeziumOperations.kt | 4 +- .../MsSqlServerCdcPartitionReaderTest.kt | 49 +---- .../mysql/cdc/MySqlDebeziumOperations.kt | 6 +- .../mysql/MysqlCdcPartitionReaderTest.kt | 78 ++++--- 10 files changed, 288 insertions(+), 272 deletions(-) diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreator.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreator.kt index caeb35160561..80399fdf8e40 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreator.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreator.kt @@ -51,7 +51,7 @@ class CdcPartitionsCreator>( val activeStreams: List by lazy { feedBootstrap.feed.streams.filter { feedBootstrap.stateQuerier.current(it) != null } } - val syntheticInput: DebeziumInput by lazy { creatorOps.synthesize() } + val syntheticInput: DebeziumInput by lazy { creatorOps.synthesize(feedBootstrap.feed.streams) } // Ensure that the WAL position upper bound has been computed for this sync. val upperBound: T = upperBoundReference.updateAndGet { diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/DebeziumOperations.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/DebeziumOperations.kt index c425f5fd6d96..c98e95034ba5 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/DebeziumOperations.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/DebeziumOperations.kt @@ -21,7 +21,7 @@ interface CdcPartitionsCreatorDebeziumOperations> { fun position(offset: DebeziumOffset): T /** Synthesizes a [DebeziumInput] when no incumbent [OpaqueStateValue] is available. */ - fun synthesize(): DebeziumInput + fun synthesize(streams: List): DebeziumInput /** Builds a [DebeziumInput] using an incumbent [OpaqueStateValue]. */ fun deserialize(opaqueStateValue: OpaqueStateValue, streams: List): DebeziumInput diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMongoTest.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMongoTest.kt index b453643a632f..debb823472eb 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMongoTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMongoTest.kt @@ -10,6 +10,7 @@ import com.mongodb.client.MongoDatabase import com.mongodb.client.model.Aggregates import com.mongodb.client.model.Filters import com.mongodb.client.model.Updates +import io.airbyte.cdk.command.OpaqueStateValue import io.airbyte.cdk.read.Stream import io.airbyte.cdk.util.Jsons import io.debezium.connector.mongodb.MongoDbConnector @@ -76,7 +77,7 @@ class CdcPartitionReaderMongoTest : fn(it.getCollection(stream.name)) } - override fun getCdcOperations(): CdcPartitionReaderDebeziumOperations { + override fun getCdcOperations(): DebeziumOperations { return object : AbstractCdcPartitionReaderDebeziumOperationsForTest(stream) { override fun position(recordValue: DebeziumRecordValue): BsonTimestamp? { val resumeToken: String = @@ -90,6 +91,13 @@ class CdcPartitionReaderMongoTest : return ResumeTokens.getTimestamp(ResumeTokens.fromBase64(resumeTokenBase64)) } + override fun deserialize(opaqueStateValue: OpaqueStateValue, streams: List): DebeziumInput { + return super.deserialize(opaqueStateValue, streams).let { + DebeziumInput(debeziumProperties(), it.state, it.isSynthetic) + + } + } + override fun deserialize( key: DebeziumRecordKey, value: DebeziumRecordValue, @@ -122,61 +130,61 @@ class CdcPartitionReaderMongoTest : changes = emptyMap(), ) } - } - } - override fun MongoDbReplicaSet.currentPosition(): BsonTimestamp = - ResumeTokens.getTimestamp(currentResumeToken()) - - override fun MongoDbReplicaSet.syntheticInput(): DebeziumInput { - val resumeToken: BsonDocument = currentResumeToken() - val timestamp: BsonTimestamp = ResumeTokens.getTimestamp(resumeToken) - val resumeTokenString: String = ResumeTokens.getData(resumeToken).asString().value - val key: ArrayNode = - Jsons.arrayNode().apply { - add(stream.namespace) - add(Jsons.objectNode().apply { put("server_id", stream.namespace) }) - } - val value: ObjectNode = - Jsons.objectNode().apply { - put("ord", timestamp.inc) - put("sec", timestamp.time) - put("resume_token", resumeTokenString) + override fun position(offset: DebeziumOffset): BsonTimestamp { + val offsetValue: ObjectNode = offset.wrapped.values.first() as ObjectNode + return BsonTimestamp(offsetValue["sec"].asInt(), offsetValue["ord"].asInt()) } - val offset = DebeziumOffset(mapOf(key to value)) - val state = DebeziumState(offset, schemaHistory = null) - val syntheticProperties: Map = debeziumProperties() - return DebeziumInput(syntheticProperties, state, isSynthetic = true) - } - private fun MongoDbReplicaSet.currentResumeToken(): BsonDocument = - withMongoDatabase { mongoDatabase: MongoDatabase -> - val pipeline = listOf(Aggregates.match(Filters.`in`("ns.coll", stream.name))) - mongoDatabase.watch(pipeline, BsonDocument::class.java).cursor().use { - it.tryNext() - it.resumeToken!! + override fun synthesize(streams: List): DebeziumInput { + val resumeToken: BsonDocument = currentResumeToken() + val timestamp: BsonTimestamp = ResumeTokens.getTimestamp(resumeToken) + val resumeTokenString: String = ResumeTokens.getData(resumeToken).asString().value + val key: ArrayNode = + Jsons.arrayNode().apply { + add(stream.namespace) + add(Jsons.objectNode().apply { put("server_id", stream.namespace) }) + } + val value: ObjectNode = + Jsons.objectNode().apply { + put("ord", timestamp.inc) + put("sec", timestamp.time) + put("resume_token", resumeTokenString) + } + val offset = DebeziumOffset(mapOf(key to value)) + val state = DebeziumState(offset, schemaHistory = null) + val syntheticProperties: Map = debeziumProperties() + return DebeziumInput(syntheticProperties, state, isSynthetic = true) } - } - override fun MongoDbReplicaSet.debeziumProperties(): Map = - DebeziumPropertiesBuilder() - .withDefault() - .withConnector(MongoDbConnector::class.java) - .withDebeziumName(stream.namespace!!) - .withHeartbeats(heartbeat) - .with("capture.scope", "database") - .with("capture.target", stream.namespace!!) - .with("mongodb.connection.string", connectionString) - .with("snapshot.mode", "no_data") - .with( + fun currentResumeToken(): BsonDocument = + container.withMongoDatabase { mongoDatabase: MongoDatabase -> + val pipeline = listOf(Aggregates.match(Filters.`in`("ns.coll", stream.name))) + mongoDatabase.watch(pipeline, BsonDocument::class.java).cursor().use { + it.tryNext() + it.resumeToken!! + } + } + + fun debeziumProperties(): Map = + DebeziumPropertiesBuilder() + .withDefault() + .withConnector(MongoDbConnector::class.java) + .withDebeziumName(stream.namespace!!) + .withHeartbeats(heartbeat) + .with("capture.scope", "database") + .with("capture.target", stream.namespace!!) + .with("mongodb.connection.string", container.connectionString) + .with("snapshot.mode", "no_data") + .with( "collection.include.list", DebeziumPropertiesBuilder.joinIncludeList( - listOf(Pattern.quote("${stream.namespace!!}.${stream.name}")) + listOf(Pattern.quote("${stream.namespace!!}.${stream.name}")) ) - ) - .with("database.include.list", stream.namespace!!) - .withOffset() - .buildMap() - - + ) + .with("database.include.list", stream.namespace!!) + .withOffset() + .buildMap() + } + } } diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderPostgresTest.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderPostgresTest.kt index b693bcf4bf41..1ae9bb893a3a 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderPostgresTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderPostgresTest.kt @@ -10,6 +10,7 @@ import com.mongodb.client.MongoDatabase import com.mongodb.client.model.Aggregates import com.mongodb.client.model.Filters import com.mongodb.client.model.Updates +import io.airbyte.cdk.command.OpaqueStateValue import io.airbyte.cdk.read.Stream import io.airbyte.cdk.testcontainers.TestContainerFactory import io.airbyte.cdk.util.Jsons @@ -85,8 +86,13 @@ class CdcPartitionReaderPostgresTest : connection.createStatement().use { fn(it) } } - override fun getCdcOperations(): CdcPartitionReaderDebeziumOperations { + override fun getCdcOperations(): DebeziumOperations { return object: AbstractCdcPartitionReaderDebeziumOperationsForTest(stream) { + override fun position(offset: DebeziumOffset): LogSequenceNumber { + val offsetValue: ObjectNode = offset.wrapped.values.first() as ObjectNode + return LogSequenceNumber.valueOf(offsetValue["lsn"].asLong()) + } + override fun position(recordValue: DebeziumRecordValue): LogSequenceNumber? { val lsn: Long = recordValue.source["lsn"]?.takeIf { it.isIntegralNumber }?.asLong() ?: return null @@ -98,63 +104,63 @@ class CdcPartitionReaderPostgresTest : val lsn: Long = offset["lsn"] as? Long ?: return null return LogSequenceNumber.valueOf(lsn) } - } - } + override fun deserialize(opaqueStateValue: OpaqueStateValue, streams: List): DebeziumInput { + return super.deserialize(opaqueStateValue, streams).let { + DebeziumInput(debeziumProperties(), it.state, it.isSynthetic) - override fun PostgreSQLContainer<*>.currentPosition(): LogSequenceNumber = - withStatement { statement: Statement -> - statement.executeQuery("SELECT pg_current_wal_lsn()").use { - it.next() - LogSequenceNumber.valueOf(it.getString(1)) - } - } - - override fun PostgreSQLContainer<*>.syntheticInput(): DebeziumInput { - val (position: LogSequenceNumber, txID: Long) = - withStatement { statement: Statement -> - statement.executeQuery("SELECT pg_current_wal_lsn(), txid_current()").use { - it.next() - LogSequenceNumber.valueOf(it.getString(1)) to it.getLong(2) } } - val timestamp: Instant = Instant.now() - val key: ArrayNode = - Jsons.arrayNode().apply { - add(databaseName) - add(Jsons.objectNode().apply { put("server", databaseName) }) - } - val value: ObjectNode = - Jsons.objectNode().apply { - put("ts_usec", timestamp.toEpochMilli() * 1000L) - put("lsn", position.asLong()) - put("txId", txID) + + override fun synthesize(streams: List): DebeziumInput { + val (position: LogSequenceNumber, txID: Long) = + container.withStatement { statement: Statement -> + statement.executeQuery("SELECT pg_current_wal_lsn(), txid_current()").use { + it.next() + LogSequenceNumber.valueOf(it.getString(1)) to it.getLong(2) + } + } + val timestamp: Instant = Instant.now() + val key: ArrayNode = + Jsons.arrayNode().apply { + add(container.databaseName) + add(Jsons.objectNode().apply { put("server", container.databaseName) }) + } + val value: ObjectNode = + Jsons.objectNode().apply { + put("ts_usec", timestamp.toEpochMilli() * 1000L) + put("lsn", position.asLong()) + put("txId", txID) + } + val offset = DebeziumOffset(mapOf(key to value)) + val state = DebeziumState(offset, schemaHistory = null) + val syntheticProperties: Map = debeziumProperties() + return DebeziumInput(syntheticProperties, state, isSynthetic = true) } - val offset = DebeziumOffset(mapOf(key to value)) - val state = DebeziumState(offset, schemaHistory = null) - val syntheticProperties: Map = debeziumProperties() - return DebeziumInput(syntheticProperties, state, isSynthetic = true) - } - override fun PostgreSQLContainer<*>.debeziumProperties(): Map = - DebeziumPropertiesBuilder() - .withDefault() - .withConnector(PostgresConnector::class.java) - .withDebeziumName(databaseName) - .withHeartbeats(heartbeat) - .with("plugin.name", "pgoutput") - .with("slot.name", SLOT_NAME) - .with("publication.name", PUBLICATION_NAME) - .with("publication.autocreate.mode", "disabled") - .with("flush.lsn.source", "false") - .withDatabase("hostname", host) - .withDatabase("port", firstMappedPort.toString()) - .withDatabase("user", username) - .withDatabase("password", password) - .withDatabase("dbname", databaseName) - .withOffset() - .withStreams(listOf(stream)) - .buildMap() + fun debeziumProperties(): Map = + DebeziumPropertiesBuilder() + .withDefault() + .withConnector(PostgresConnector::class.java) + .withDebeziumName(container.databaseName) + .withHeartbeats(heartbeat) + .with("plugin.name", "pgoutput") + .with("slot.name", SLOT_NAME) + .with("publication.name", PUBLICATION_NAME) + .with("publication.autocreate.mode", "disabled") + .with("flush.lsn.source", "false") + .withDatabase("hostname", container.host) + .withDatabase("port", container.firstMappedPort.toString()) + .withDatabase("user", container.username) + .withDatabase("password", container.password) + .withDatabase("dbname", container.databaseName) + .withOffset() + .withStreams(listOf(stream)) + .buildMap() + } + + + } } diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorTest.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorTest.kt index c96d6dbd8a57..bd0796a7555a 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorTest.kt @@ -90,7 +90,7 @@ class CdcPartitionsCreatorTest { every { stateQuerier.feeds } returns listOf(global, stream) every { creatorOps.position(syntheticOffset) } returns 123L every { creatorOps.position(incumbentOffset) } returns 123L - every { creatorOps.synthesize() } returns syntheticInput + every { creatorOps.synthesize(listOf(stream)) } returns syntheticInput } @Test @@ -105,7 +105,7 @@ class CdcPartitionsCreatorTest { state = DebeziumState(offset = syntheticOffset, schemaHistory = null), isSynthetic = true, ) - every { creatorOps.synthesize() } returns syntheticInput + every { creatorOps.synthesize(listOf(stream)) } returns syntheticInput upperBoundReference.set(null) val readers: List = runBlocking { creator.run() } Assertions.assertEquals(1, readers.size) diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/testFixtures/kotlin/io/airbyte/cdk/read/cdc/AbstractCdcPartitionReaderTest.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/testFixtures/kotlin/io/airbyte/cdk/read/cdc/AbstractCdcPartitionReaderTest.kt index 546a61791565..807eadbd6094 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/testFixtures/kotlin/io/airbyte/cdk/read/cdc/AbstractCdcPartitionReaderTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/testFixtures/kotlin/io/airbyte/cdk/read/cdc/AbstractCdcPartitionReaderTest.kt @@ -49,7 +49,6 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab val timeout: Duration = Duration.ofSeconds(10), ) { val log = KotlinLogging.logger { } - val container: C = createContainer() val stream = Stream( id = StreamIdentifier.from(StreamDescriptor().withName("tbl").withNamespace(namespace)), @@ -58,20 +57,19 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab configuredPrimaryKey = null, configuredCursor = TestMetaFieldDecorator.GlobalCursor, ) - val cdcOperations = createCdcOperations() val global: Global get() = Global(listOf(stream)) + val container: C = createContainer() + val debeziumOperations = getCdcOperations() + abstract fun createContainer(): C abstract fun C.createStream() abstract fun C.insert12345() abstract fun C.update135() abstract fun C.delete24() - abstract fun C.currentPosition(): T - abstract fun C.debeziumProperties(): Map - @Test /** * The [integrationTest] method sets up (and tears down) a testcontainer for the data source @@ -82,67 +80,86 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab * [syntheticInput] and [debeziumProperties], and exercises all [PartitionReader] methods. */ fun integrationTest() { - container.createStream() - log.info{"SGXX 1"} - val p0: T = container.currentPosition() - log.info{"SGXX 2"} - val syntheticInput = cdcOperations.synthesize() - log.info{"SGXX 3"} - val r0: ReadResult = read(syntheticInput, p0) - log.info{"SGXX 4"} - Assertions.assertEquals(emptyList(), r0.records) - Assertions.assertNotEquals( - CdcPartitionReader.CloseReason.RECORD_REACHED_TARGET_POSITION, - r0.closeReason, - ) - - container.insert12345() - val insert = - listOf( - Insert(1, 1), - Insert(2, 2), - Insert(3, 3), - Insert(4, 4), - Insert(5, 5), + container.use { container: C -> + container.createStream() + log.info{"SGXX 1"} + val p0: T = debeziumOperations.position(debeziumOperations.synthesize(listOf(stream)).state.offset) + log.info{"SGXX 2"} + val syntheticInput = debeziumOperations.synthesize(listOf(stream)) + val r0: ReadResult = read(syntheticInput, p0) + log.info{"SGXX 4"} + Assertions.assertEquals(emptyList(), r0.records) + Assertions.assertNotEquals( + CdcPartitionReader.CloseReason.RECORD_REACHED_TARGET_POSITION, + r0.closeReason, ) - container.update135() - val update = - listOf( - Update(1, 6), - Update(3, 7), - Update(5, 8), + + container.insert12345() + val insert = + listOf( + Insert(1, 1), + Insert(2, 2), + Insert(3, 3), + Insert(4, 4), + Insert(5, 5), + ) + container.update135() + val update = + listOf( + Update(1, 6), + Update(3, 7), + Update(5, 8), + ) + log.info{"SGXX 5"} + val p1: T = debeziumOperations.position(debeziumOperations.synthesize(listOf(stream)).state.offset) + log.info{"SGXX 6"} + container.delete24() + val delete = + listOf( + Delete(2), + Delete(4), + ) + log.info{"SGXX 7"} + val p2: T = debeziumOperations.position(debeziumOperations.synthesize(listOf(stream)).state.offset) + log.info{"SGXX 8"} + + val input = debeziumOperations.deserialize(debeziumOperations.serialize(r0.state), listOf(stream)) + log.info{"SGXX 9"} + + val r1: ReadResult = read(input, p1) + log.info{"SGXX 10"} + Assertions.assertEquals(insert + update, r1.records.take(insert.size + update.size)) + Assertions.assertNotNull(r1.closeReason) + log.info{"SGXX 11"} + val r2: ReadResult = read(input, p2) + log.info{"SGXX 12"} + Assertions.assertEquals( + insert + update + delete, + r2.records.take(insert.size + update.size + delete.size), ) - log.info{"SGXX 5"} - val p1: T = container.currentPosition() - log.info{"SGXX 6"} - container.delete24() - val delete = - listOf( - Delete(2), - Delete(4), + Assertions.assertNotNull(r2.closeReason) + Assertions.assertNotEquals( + CdcPartitionReader.CloseReason.RECORD_REACHED_TARGET_POSITION, + r2.closeReason ) - log.info{"SGXX 7"} - val p2: T = container.currentPosition() - log.info{"SGXX 8"} - val input = DebeziumInput(container.debeziumProperties(), r0.state, isSynthetic = false) - log.info{"SGXX 9"} - val r1: ReadResult = read(input, p1) - log.info{"SGXX 10"} - Assertions.assertEquals(insert + update, r1.records.take(insert.size + update.size)) - Assertions.assertNotNull(r1.closeReason) - log.info{"SGXX 11"} - val r2: ReadResult = read(input, p2) - log.info{"SGXX 12"} - Assertions.assertEquals( - insert + update + delete, - r2.records.take(insert.size + update.size + delete.size), - ) - Assertions.assertNotNull(r2.closeReason) - Assertions.assertNotEquals( - CdcPartitionReader.CloseReason.RECORD_REACHED_TARGET_POSITION, - r2.closeReason - ) + + val inputFromR1: DebeziumInput = debeziumOperations.deserialize(debeziumOperations.serialize(r1.state), listOf(stream)) + log.info{"SGXX 15"} + val r3: ReadResult = read(inputFromR1, p2) + log.info{"SGXX 16"} + val p3: T = debeziumOperations.position(debeziumOperations.synthesize(listOf(stream)).state.offset) + log.info{"SGX syntheticInput=$syntheticInput"} + log.info{"SGX r0.state=${r0.state}"} + log.info{"SGX r1.state=${r1.state}"} + log.info{"SGX p1=$p1"} + log.info{"SGX p2=$p2"} + log.info{"SGX p3=$p3"} + log.info{"SGX r3=$r3"} + log.info{"SGX r3.records=${r3.records}"} + log.info{"SGX r3.state.offset.wrapped=${r3.state.offset.wrapped}"} + log.info{"SGX r2.state.offset.wrapped=${r2.state.offset.wrapped}"} + } } private fun read( @@ -174,7 +191,7 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab CdcPartitionReader( ConcurrencyResource(1), streamRecordConsumers, - cdcOperations, + getCdcOperations(), upperBound, input, ) @@ -208,10 +225,9 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab Assertions.assertEquals(0, reader.numEventsWithoutSourceRecord.get()) Assertions.assertEquals(0, reader.numSourceRecordsWithoutPosition.get()) Assertions.assertEquals(0, reader.numEventValuesWithoutPosition.get()) - log.info{"SGX outputConsumer.records()=${outputConsumer.records()}"} return ReadResult( outputConsumer.records().map { Jsons.treeToValue(it.data, Record::class.java) }, - cdcOperations.deserialize(checkpoint.opaqueStateValue, listOf(stream)).state, + debeziumOperations.deserialize(checkpoint.opaqueStateValue, listOf(stream)).state, reader.closeReasonReference.get(), ) } @@ -248,31 +264,9 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab } } - abstract fun createCdcOperations(): DebeziumOperations - abstract class AbstractCdcPartitionReaderDebeziumOperationsForTest> + abstract fun getCdcOperations(): DebeziumOperations + abstract inner class AbstractCdcPartitionReaderDebeziumOperationsForTest> (val stream: Stream): DebeziumOperations { - override fun deserialize(opaqueStateValue: OpaqueStateValue, streams: List): DebeziumInput { - log.info{"SGX opaqueStateValue=$opaqueStateValue"} - val offsetNode: ObjectNode = opaqueStateValue["offset"] as ObjectNode - val offset = - DebeziumOffset( - offsetNode - .fields() - .asSequence() - .map { Jsons.readTree(it.key) to Jsons.readTree(it.value.asText()) } - .toMap(), - ) - val historyNode: ArrayNode = - opaqueStateValue["schemaHistory"] as? ArrayNode - ?: return DebeziumInput(emptyMap(), DebeziumState(offset, schemaHistory = null), false) - val schemaHistory = - DebeziumSchemaHistory( - historyNode.elements().asSequence().toList().map { - HistoryRecord(DocumentReader.defaultReader().read(it.asText())) - }, - ) - return DebeziumInput(emptyMap(), DebeziumState(offset, schemaHistory), false) - } override fun deserialize( key: DebeziumRecordKey, value: DebeziumRecordValue, @@ -316,10 +310,29 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab }, ), ) - override fun position(offset: DebeziumOffset): T { - TODO("Shouldn't be called from the test") + override fun deserialize(opaqueStateValue: OpaqueStateValue, streams: List): DebeziumInput { + val offsetNode: ObjectNode = opaqueStateValue["offset"] as ObjectNode + val offset = + DebeziumOffset( + offsetNode + .fields() + .asSequence() + .map { Jsons.readTree(it.key) to Jsons.readTree(it.value.asText()) } + .toMap(), + ) + val historyNode: ArrayNode? = opaqueStateValue["schemaHistory"] as? ArrayNode + val schemaHistory: DebeziumSchemaHistory? = if (historyNode != null) { + DebeziumSchemaHistory( + historyNode.elements().asSequence().toList().map { + HistoryRecord(DocumentReader.defaultReader().read(it.asText())) + }, + ) + } else { + null + } + val deserializedStateValue = DebeziumState(offset, schemaHistory) + return DebeziumInput(emptyMap(), deserializedStateValue, false) } - } } diff --git a/airbyte-integrations/connectors/source-mssql/src/main/kotlin/io/airbyte/integrations/source/mssql/MsSqlServerDebeziumOperations.kt b/airbyte-integrations/connectors/source-mssql/src/main/kotlin/io/airbyte/integrations/source/mssql/MsSqlServerDebeziumOperations.kt index f66214796ccb..7451c1b3817d 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/kotlin/io/airbyte/integrations/source/mssql/MsSqlServerDebeziumOperations.kt +++ b/airbyte-integrations/connectors/source-mssql/src/main/kotlin/io/airbyte/integrations/source/mssql/MsSqlServerDebeziumOperations.kt @@ -73,7 +73,7 @@ class MsSqlServerDebeziumOperations( return TxLogPosition.valueOf(Lsn.valueOf(commitLsn), Lsn.valueOf(changeLsn)) } - override fun synthesize(): DebeziumInput { + override fun synthesize(streams: List): DebeziumInput { val lsn = queryMaxLsn() val key: ArrayNode = Jsons.arrayNode().apply { @@ -102,7 +102,7 @@ class MsSqlServerDebeziumOperations( // If not in snapshot mode, initial will make sure that a snapshot is taken if the transaction log // is rotated out. This will also end up read streaming changes from the transaction_log. .with("snapshot.mode", "recovery") - .withStreams(listOf()) + .withStreams(streams) .buildMap(), state, isSynthetic = true) } diff --git a/airbyte-integrations/connectors/source-mssql/src/test/kotlin/io/airbyte/integrations/source/mssql/MsSqlServerCdcPartitionReaderTest.kt b/airbyte-integrations/connectors/source-mssql/src/test/kotlin/io/airbyte/integrations/source/mssql/MsSqlServerCdcPartitionReaderTest.kt index f90d01f3a5e1..662dc3a5d50b 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/kotlin/io/airbyte/integrations/source/mssql/MsSqlServerCdcPartitionReaderTest.kt +++ b/airbyte-integrations/connectors/source-mssql/src/test/kotlin/io/airbyte/integrations/source/mssql/MsSqlServerCdcPartitionReaderTest.kt @@ -2,7 +2,9 @@ package io.airbyte.integrations.source.mssql import com.fasterxml.jackson.databind.node.ArrayNode import com.fasterxml.jackson.databind.node.ObjectNode +import io.airbyte.cdk.command.OpaqueStateValue import io.airbyte.cdk.jdbc.JdbcConnectionFactory +import io.airbyte.cdk.read.Stream import io.airbyte.cdk.read.cdc.* import io.airbyte.cdk.testcontainers.TestContainerFactory import io.airbyte.cdk.util.Jsons @@ -102,10 +104,12 @@ class MsSqlServerCdcPartitionReaderTest : } } - override fun createCdcOperations(): DebeziumOperations { - val config = MsSqlServerSourceConfigurationFactory().make(container.config) - val delegate = MsSqlServerDebeziumOperations(JdbcConnectionFactory(config), config) - return object: DebeziumOperations by delegate { + override fun getCdcOperations(): DebeziumOperations { + return object: AbstractCdcPartitionReaderDebeziumOperationsForTest(stream) { + override fun position(offset: DebeziumOffset): TxLogPosition { + return TxLogPosition.valueOf(Lsn.valueOf(offset.wrapped.values.first()["commit_lsn"].asText()), Lsn.valueOf(offset.wrapped.values.first()["change_lsn"].asText())) + } + override fun position(recordValue: DebeziumRecordValue): TxLogPosition? { val commitLsn: String = recordValue.source["commit_lsn"]?.takeIf { it.isTextual }?.asText() ?: return null @@ -120,42 +124,11 @@ class MsSqlServerCdcPartitionReaderTest : return TxLogPosition.valueOf(Lsn.valueOf(commitLsn), Lsn.valueOf(changeLsn)) } - override fun synthesize(): DebeziumInput { - val syntheticInput = delegate.synthesize() - - return DebeziumInput( - DebeziumPropertiesBuilder().with(syntheticInput.properties).withStreams(listOf(stream)).with("schema.include.list", "test").buildMap(), - syntheticInput.state, - isSynthetic = true) + override fun synthesize(streams: List): DebeziumInput { + val config = MsSqlServerSourceConfigurationFactory().make(container.config) + return MsSqlServerDebeziumOperations(JdbcConnectionFactory(config), config).synthesize(streams) } } } - - - - override fun MsSqlServercontainerWithCdc.currentPosition(): TxLogPosition { - val dbName = withStatement { statement: Statement -> - statement.executeQuery("SELECT DB_NAME()").use { - it.next() - it.getString(1) - } - } - log.info{"SGX dbNAme=$dbName"} - return withStatement { statement: Statement -> - statement.executeQuery("select sys.fn_cdc_get_max_lsn() as max_lsn").use { - it.next() - val lsn = Lsn.valueOf(it.getBytes("max_lsn")) - log.info{"SGX returning currentPosition=$lsn"} - TxLogPosition.valueOf(lsn, lsn) - } - } - } - - override fun MsSqlServercontainerWithCdc.debeziumProperties(): Map { - val config = MsSqlServerSourceConfigurationFactory().make(config) - val commonProperties = MsSqlServerDebeziumOperations.commonProperties(JdbcConnectionFactory(config).ensureTunnelSession(), databaseName, config) - return DebeziumPropertiesBuilder().with(commonProperties).withStreams(listOf(stream)).with("schema.include.list", "test").with("incremental.snapshot.chunk.size", "1").with("max.batch.size", "1").buildMap() - } - } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumOperations.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumOperations.kt index 97d43c0698a8..ced50550763f 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumOperations.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumOperations.kt @@ -211,7 +211,7 @@ class MySqlDebeziumOperations( return MySqlPosition(file.toString(), pos) } - override fun synthesize(): DebeziumInput { + override fun synthesize(streams: List): DebeziumInput { val (mySqlPosition: MySqlPosition, gtidSet: String?) = queryPositionAndGtids() val topicPrefixName: String = DebeziumPropertiesBuilder.sanitizeTopicPrefix(databaseName) val timestamp: Instant = Instant.now() @@ -316,11 +316,11 @@ class MySqlDebeziumOperations( if (cdcValidationResult == CdcStateValidateResult.INVALID_RESET) { throw OffsetInvalidNeedsResyncIllegalStateException() } - return synthesize() + return synthesize(streams) } val properties: Map = - DebeziumPropertiesBuilder().with(commonProperties).withStreams(streams).buildMap() + DebeziumPropertiesBuilder().with(commonProperties).buildMap() return DebeziumInput(properties, debeziumState, isSynthetic = false) } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcPartitionReaderTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcPartitionReaderTest.kt index e013bc4c89ec..079855a16628 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcPartitionReaderTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcPartitionReaderTest.kt @@ -2,6 +2,8 @@ package io.airbyte.integrations.source.mysql import com.fasterxml.jackson.databind.node.ArrayNode import com.fasterxml.jackson.databind.node.ObjectNode +import io.airbyte.cdk.command.OpaqueStateValue +import io.airbyte.cdk.read.Stream import io.airbyte.cdk.read.cdc.* import io.airbyte.cdk.testcontainers.TestContainerFactory import io.airbyte.cdk.util.Jsons @@ -66,8 +68,15 @@ class MysqlCdcPartitionReaderTest : connection.createStatement().use { fn(it) } } - override fun createCdcOperations(): DebeziumOperations { + override fun getCdcOperations(): DebeziumOperations { return object: AbstractCdcPartitionReaderDebeziumOperationsForTest(stream) { + override fun position(offset: DebeziumOffset): Position { + val offsetAsJson = offset.wrapped.values.first() + val retVal = Position(offsetAsJson["file"].asText(), offsetAsJson["pos"].asLong()) + log.info { "SGX returning retval=$retVal, offset=$offset" } + return retVal + } + override fun position(recordValue: DebeziumRecordValue): Position? { log.info{"SGX MySqlCdcPartitionReaderTest.position. recordValue=$recordValue"} val file: String = @@ -85,8 +94,8 @@ class MysqlCdcPartitionReaderTest : return Position(file, pos) } - override fun synthesize(): DebeziumInput { - val position: Position = container.currentPosition() + override fun synthesize(streams: List): DebeziumInput { + val position: Position = currentPosition() val timestamp: Instant = Instant.now() val key: ArrayNode = Jsons.arrayNode().apply { @@ -103,43 +112,50 @@ class MysqlCdcPartitionReaderTest : val state = DebeziumState(offset, schemaHistory = null) val syntheticProperties: Map = DebeziumPropertiesBuilder() - .with(container.debeziumProperties()) + .with(debeziumProperties()) .with("snapshot.mode", "recovery") .withStreams(listOf()) .buildMap() return DebeziumInput(syntheticProperties, state, isSynthetic = true) } - } - } - override fun MySQLContainer<*>.currentPosition(): Position = - withStatement { statement: Statement -> - statement.executeQuery("SHOW MASTER STATUS").use { - it.next() - Position(it.getString("File"), it.getLong("Position")) + + override fun deserialize(opaqueStateValue: OpaqueStateValue, streams: List): DebeziumInput { + return super.deserialize(opaqueStateValue, streams).let { + DebeziumInput(debeziumProperties(), it.state, it.isSynthetic) } } - override fun MySQLContainer<*>.debeziumProperties(): Map = - DebeziumPropertiesBuilder() - .withDefault() - .withConnector(MySqlConnector::class.java) - .withDebeziumName(databaseName) - .withHeartbeats(heartbeat) - .with("include.schema.changes", "false") - .with("connect.keep.alive.interval.ms", "1000") - .withDatabase("hostname", host) - .withDatabase("port", firstMappedPort.toString()) - .withDatabase("user", username) - .withDatabase("password", password) - .withDatabase("dbname", databaseName) - .withDatabase("server.id", Random.Default.nextInt(5400..6400).toString()) - .withDatabase("include.list", databaseName) - .withOffset() - .withSchemaHistory() - .with("snapshot.mode", "when_needed") - .withStreams(listOf(stream)) - .buildMap() + fun currentPosition(): Position = + container.withStatement { statement: Statement -> + statement.executeQuery("SHOW MASTER STATUS").use { + it.next() + Position(it.getString("File"), it.getLong("Position")) + } + } + + fun debeziumProperties(): Map = + DebeziumPropertiesBuilder() + .withDefault() + .withConnector(MySqlConnector::class.java) + .withDebeziumName(container.databaseName) + .withHeartbeats(heartbeat) + .with("include.schema.changes", "false") + .with("connect.keep.alive.interval.ms", "1000") + .withDatabase("hostname", container.host) + .withDatabase("port", container.firstMappedPort.toString()) + .withDatabase("user", container.username) + .withDatabase("password", container.password) + .withDatabase("dbname", container.databaseName) + .withDatabase("server.id", Random.Default.nextInt(5400..6400).toString()) + .withDatabase("include.list", container.databaseName) + .withOffset() + .withSchemaHistory() + .with("snapshot.mode", "when_needed") + .withStreams(listOf(stream)) + .buildMap() + } + } }