Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

passing tests #49933

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class CdcPartitionsCreator<T : Comparable<T>>(
val activeStreams: List<Stream> by lazy {
feedBootstrap.feed.streams.filter { feedBootstrap.stateQuerier.current(it) != null }
}
val syntheticInput: DebeziumInput by lazy { creatorOps.synthesize() }
val syntheticInput: DebeziumInput by lazy { creatorOps.synthesize(feedBootstrap.feed.streams) }
// Ensure that the WAL position upper bound has been computed for this sync.
val upperBound: T =
upperBoundReference.updateAndGet {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ interface CdcPartitionsCreatorDebeziumOperations<T : Comparable<T>> {
fun position(offset: DebeziumOffset): T

/** Synthesizes a [DebeziumInput] when no incumbent [OpaqueStateValue] is available. */
fun synthesize(): DebeziumInput
fun synthesize(streams: List<Stream>): DebeziumInput

/** Builds a [DebeziumInput] using an incumbent [OpaqueStateValue]. */
fun deserialize(opaqueStateValue: OpaqueStateValue, streams: List<Stream>): DebeziumInput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.mongodb.client.MongoDatabase
import com.mongodb.client.model.Aggregates
import com.mongodb.client.model.Filters
import com.mongodb.client.model.Updates
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.read.Stream
import io.airbyte.cdk.util.Jsons
import io.debezium.connector.mongodb.MongoDbConnector
Expand Down Expand Up @@ -76,7 +77,7 @@ class CdcPartitionReaderMongoTest :
fn(it.getCollection(stream.name))
}

override fun getCdcOperations(): CdcPartitionReaderDebeziumOperations<BsonTimestamp> {
override fun getCdcOperations(): DebeziumOperations<BsonTimestamp> {
return object : AbstractCdcPartitionReaderDebeziumOperationsForTest<BsonTimestamp>(stream) {
override fun position(recordValue: DebeziumRecordValue): BsonTimestamp? {
val resumeToken: String =
Expand All @@ -90,6 +91,13 @@ class CdcPartitionReaderMongoTest :
return ResumeTokens.getTimestamp(ResumeTokens.fromBase64(resumeTokenBase64))
}

override fun deserialize(opaqueStateValue: OpaqueStateValue, streams: List<Stream>): DebeziumInput {
return super.deserialize(opaqueStateValue, streams).let {
DebeziumInput(debeziumProperties(), it.state, it.isSynthetic)

}
}

override fun deserialize(
key: DebeziumRecordKey,
value: DebeziumRecordValue,
Expand Down Expand Up @@ -122,61 +130,61 @@ class CdcPartitionReaderMongoTest :
changes = emptyMap(),
)
}
}
}

override fun MongoDbReplicaSet.currentPosition(): BsonTimestamp =
ResumeTokens.getTimestamp(currentResumeToken())

override fun MongoDbReplicaSet.syntheticInput(): DebeziumInput {
val resumeToken: BsonDocument = currentResumeToken()
val timestamp: BsonTimestamp = ResumeTokens.getTimestamp(resumeToken)
val resumeTokenString: String = ResumeTokens.getData(resumeToken).asString().value
val key: ArrayNode =
Jsons.arrayNode().apply {
add(stream.namespace)
add(Jsons.objectNode().apply { put("server_id", stream.namespace) })
}
val value: ObjectNode =
Jsons.objectNode().apply {
put("ord", timestamp.inc)
put("sec", timestamp.time)
put("resume_token", resumeTokenString)
override fun position(offset: DebeziumOffset): BsonTimestamp {
val offsetValue: ObjectNode = offset.wrapped.values.first() as ObjectNode
return BsonTimestamp(offsetValue["sec"].asInt(), offsetValue["ord"].asInt())
}
val offset = DebeziumOffset(mapOf(key to value))
val state = DebeziumState(offset, schemaHistory = null)
val syntheticProperties: Map<String, String> = debeziumProperties()
return DebeziumInput(syntheticProperties, state, isSynthetic = true)
}

private fun MongoDbReplicaSet.currentResumeToken(): BsonDocument =
withMongoDatabase { mongoDatabase: MongoDatabase ->
val pipeline = listOf<Bson>(Aggregates.match(Filters.`in`("ns.coll", stream.name)))
mongoDatabase.watch(pipeline, BsonDocument::class.java).cursor().use {
it.tryNext()
it.resumeToken!!
override fun synthesize(streams: List<Stream>): DebeziumInput {
val resumeToken: BsonDocument = currentResumeToken()
val timestamp: BsonTimestamp = ResumeTokens.getTimestamp(resumeToken)
val resumeTokenString: String = ResumeTokens.getData(resumeToken).asString().value
val key: ArrayNode =
Jsons.arrayNode().apply {
add(stream.namespace)
add(Jsons.objectNode().apply { put("server_id", stream.namespace) })
}
val value: ObjectNode =
Jsons.objectNode().apply {
put("ord", timestamp.inc)
put("sec", timestamp.time)
put("resume_token", resumeTokenString)
}
val offset = DebeziumOffset(mapOf(key to value))
val state = DebeziumState(offset, schemaHistory = null)
val syntheticProperties: Map<String, String> = debeziumProperties()
return DebeziumInput(syntheticProperties, state, isSynthetic = true)
}
}

override fun MongoDbReplicaSet.debeziumProperties(): Map<String, String> =
DebeziumPropertiesBuilder()
.withDefault()
.withConnector(MongoDbConnector::class.java)
.withDebeziumName(stream.namespace!!)
.withHeartbeats(heartbeat)
.with("capture.scope", "database")
.with("capture.target", stream.namespace!!)
.with("mongodb.connection.string", connectionString)
.with("snapshot.mode", "no_data")
.with(
fun currentResumeToken(): BsonDocument =
container.withMongoDatabase { mongoDatabase: MongoDatabase ->
val pipeline = listOf<Bson>(Aggregates.match(Filters.`in`("ns.coll", stream.name)))
mongoDatabase.watch(pipeline, BsonDocument::class.java).cursor().use {
it.tryNext()
it.resumeToken!!
}
}

fun debeziumProperties(): Map<String, String> =
DebeziumPropertiesBuilder()
.withDefault()
.withConnector(MongoDbConnector::class.java)
.withDebeziumName(stream.namespace!!)
.withHeartbeats(heartbeat)
.with("capture.scope", "database")
.with("capture.target", stream.namespace!!)
.with("mongodb.connection.string", container.connectionString)
.with("snapshot.mode", "no_data")
.with(
"collection.include.list",
DebeziumPropertiesBuilder.joinIncludeList(
listOf(Pattern.quote("${stream.namespace!!}.${stream.name}"))
listOf(Pattern.quote("${stream.namespace!!}.${stream.name}"))
)
)
.with("database.include.list", stream.namespace!!)
.withOffset()
.buildMap()


)
.with("database.include.list", stream.namespace!!)
.withOffset()
.buildMap()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.mongodb.client.MongoDatabase
import com.mongodb.client.model.Aggregates
import com.mongodb.client.model.Filters
import com.mongodb.client.model.Updates
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.read.Stream
import io.airbyte.cdk.testcontainers.TestContainerFactory
import io.airbyte.cdk.util.Jsons
Expand Down Expand Up @@ -85,8 +86,13 @@ class CdcPartitionReaderPostgresTest :
connection.createStatement().use { fn(it) }
}

override fun getCdcOperations(): CdcPartitionReaderDebeziumOperations<LogSequenceNumber> {
override fun getCdcOperations(): DebeziumOperations<LogSequenceNumber> {
return object: AbstractCdcPartitionReaderDebeziumOperationsForTest<LogSequenceNumber>(stream) {
override fun position(offset: DebeziumOffset): LogSequenceNumber {
val offsetValue: ObjectNode = offset.wrapped.values.first() as ObjectNode
return LogSequenceNumber.valueOf(offsetValue["lsn"].asLong())
}

override fun position(recordValue: DebeziumRecordValue): LogSequenceNumber? {
val lsn: Long =
recordValue.source["lsn"]?.takeIf { it.isIntegralNumber }?.asLong() ?: return null
Expand All @@ -98,63 +104,63 @@ class CdcPartitionReaderPostgresTest :
val lsn: Long = offset["lsn"] as? Long ?: return null
return LogSequenceNumber.valueOf(lsn)
}
}
}

override fun deserialize(opaqueStateValue: OpaqueStateValue, streams: List<Stream>): DebeziumInput {
return super.deserialize(opaqueStateValue, streams).let {
DebeziumInput(debeziumProperties(), it.state, it.isSynthetic)

override fun PostgreSQLContainer<*>.currentPosition(): LogSequenceNumber =
withStatement { statement: Statement ->
statement.executeQuery("SELECT pg_current_wal_lsn()").use {
it.next()
LogSequenceNumber.valueOf(it.getString(1))
}
}

override fun PostgreSQLContainer<*>.syntheticInput(): DebeziumInput {
val (position: LogSequenceNumber, txID: Long) =
withStatement { statement: Statement ->
statement.executeQuery("SELECT pg_current_wal_lsn(), txid_current()").use {
it.next()
LogSequenceNumber.valueOf(it.getString(1)) to it.getLong(2)
}
}
val timestamp: Instant = Instant.now()
val key: ArrayNode =
Jsons.arrayNode().apply {
add(databaseName)
add(Jsons.objectNode().apply { put("server", databaseName) })
}
val value: ObjectNode =
Jsons.objectNode().apply {
put("ts_usec", timestamp.toEpochMilli() * 1000L)
put("lsn", position.asLong())
put("txId", txID)

override fun synthesize(streams: List<Stream>): DebeziumInput {
val (position: LogSequenceNumber, txID: Long) =
container.withStatement { statement: Statement ->
statement.executeQuery("SELECT pg_current_wal_lsn(), txid_current()").use {
it.next()
LogSequenceNumber.valueOf(it.getString(1)) to it.getLong(2)
}
}
val timestamp: Instant = Instant.now()
val key: ArrayNode =
Jsons.arrayNode().apply {
add(container.databaseName)
add(Jsons.objectNode().apply { put("server", container.databaseName) })
}
val value: ObjectNode =
Jsons.objectNode().apply {
put("ts_usec", timestamp.toEpochMilli() * 1000L)
put("lsn", position.asLong())
put("txId", txID)
}
val offset = DebeziumOffset(mapOf(key to value))
val state = DebeziumState(offset, schemaHistory = null)
val syntheticProperties: Map<String, String> = debeziumProperties()
return DebeziumInput(syntheticProperties, state, isSynthetic = true)
}
val offset = DebeziumOffset(mapOf(key to value))
val state = DebeziumState(offset, schemaHistory = null)
val syntheticProperties: Map<String, String> = debeziumProperties()
return DebeziumInput(syntheticProperties, state, isSynthetic = true)
}

override fun PostgreSQLContainer<*>.debeziumProperties(): Map<String, String> =
DebeziumPropertiesBuilder()
.withDefault()
.withConnector(PostgresConnector::class.java)
.withDebeziumName(databaseName)
.withHeartbeats(heartbeat)
.with("plugin.name", "pgoutput")
.with("slot.name", SLOT_NAME)
.with("publication.name", PUBLICATION_NAME)
.with("publication.autocreate.mode", "disabled")
.with("flush.lsn.source", "false")
.withDatabase("hostname", host)
.withDatabase("port", firstMappedPort.toString())
.withDatabase("user", username)
.withDatabase("password", password)
.withDatabase("dbname", databaseName)
.withOffset()
.withStreams(listOf(stream))
.buildMap()
fun debeziumProperties(): Map<String, String> =
DebeziumPropertiesBuilder()
.withDefault()
.withConnector(PostgresConnector::class.java)
.withDebeziumName(container.databaseName)
.withHeartbeats(heartbeat)
.with("plugin.name", "pgoutput")
.with("slot.name", SLOT_NAME)
.with("publication.name", PUBLICATION_NAME)
.with("publication.autocreate.mode", "disabled")
.with("flush.lsn.source", "false")
.withDatabase("hostname", container.host)
.withDatabase("port", container.firstMappedPort.toString())
.withDatabase("user", container.username)
.withDatabase("password", container.password)
.withDatabase("dbname", container.databaseName)
.withOffset()
.withStreams(listOf(stream))
.buildMap()
}


}
}


Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class CdcPartitionsCreatorTest {
every { stateQuerier.feeds } returns listOf(global, stream)
every { creatorOps.position(syntheticOffset) } returns 123L
every { creatorOps.position(incumbentOffset) } returns 123L
every { creatorOps.synthesize() } returns syntheticInput
every { creatorOps.synthesize(listOf(stream)) } returns syntheticInput
}

@Test
Expand All @@ -105,7 +105,7 @@ class CdcPartitionsCreatorTest {
state = DebeziumState(offset = syntheticOffset, schemaHistory = null),
isSynthetic = true,
)
every { creatorOps.synthesize() } returns syntheticInput
every { creatorOps.synthesize(listOf(stream)) } returns syntheticInput
upperBoundReference.set(null)
val readers: List<PartitionReader> = runBlocking { creator.run() }
Assertions.assertEquals(1, readers.size)
Expand Down
Loading
Loading