Skip to content

Commit

Permalink
Upgrade debezium to 2.3.5.Final
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhil-ctds committed May 20, 2024
1 parent 9a4e571 commit 1cde451
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ flexible messaging model and an intuitive client API.</description>
<presto.version>334</presto.version>
<scala.binary.version>2.13</scala.binary.version>
<scala-library.version>2.13.10</scala-library.version>
<debezium.version>1.9.7.Final</debezium.version>
<debezium.version>2.3.5.Final</debezium.version>
<debezium.postgresql.version>42.5.0</debezium.postgresql.version>
<debezium.mysql.version>8.0.30</debezium.mysql.version>
<!-- Override version that brings CVE-2022-3143 with debezium -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
setConfigIfNull(config, PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);

// database.history : implementation class for database history.
setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name(), DEFAULT_HISTORY);
setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY.name(), DEFAULT_HISTORY);

// database.history.pulsar.service.url
String pulsarUrl = (String) config.get(PulsarDatabaseHistory.SERVICE_URL.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryException;
import io.debezium.relational.history.SchemaHistoryListener;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -52,12 +52,12 @@
import org.apache.pulsar.client.api.Schema;

/**
* A {@link DatabaseHistory} implementation that records schema changes as normal pulsar messages on the specified
* A {@link SchemaHistory} implementation that records schema changes as normal pulsar messages on the specified
* topic, and that recovers the history by establishing a Kafka Consumer re-processing all messages on that topic.
*/
@Slf4j
@ThreadSafe
public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
public final class PulsarDatabaseHistory extends AbstractSchemaHistory {

public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.topic")
.withDisplayName("Database history topic name")
Expand Down Expand Up @@ -97,7 +97,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
TOPIC,
SERVICE_URL,
CLIENT_BUILDER,
DatabaseHistory.NAME,
SchemaHistory.NAME,
READER_CONFIG);

private final ObjectMapper mapper = new ObjectMapper();
Expand All @@ -113,7 +113,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
public void configure(
Configuration config,
HistoryRecordComparator comparator,
DatabaseHistoryListener listener,
SchemaHistoryListener listener,
boolean useCatalogBeforeSchema) {
super.configure(config, comparator, listener, useCatalogBeforeSchema);
if (!config.validateAndRecord(ALL_FIELDS, logger::error)) {
Expand Down Expand Up @@ -148,7 +148,7 @@ public void configure(
}

// Copy the relevant portions of the configuration and add useful defaults ...
this.dbHistoryName = config.getString(DatabaseHistory.NAME, UUID.randomUUID().toString());
this.dbHistoryName = config.getString(SchemaHistory.NAME, UUID.randomUUID().toString());

log.info("Configure to store the debezium database history {} to pulsar topic {}",
dbHistoryName, topicName);
Expand Down Expand Up @@ -201,7 +201,7 @@ public void start() {
}

@Override
protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
if (this.producer == null) {
throw new IllegalStateException("No producer is available. Ensure that 'start()'"
+ " is called before storing database history records.");
Expand All @@ -212,7 +212,7 @@ protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException
try {
producer.send(record.toString());
} catch (PulsarClientException e) {
throw new DatabaseHistoryException(e);
throw new SchemaHistoryException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.text.ParsingException;
import io.debezium.util.Collect;

Expand Down Expand Up @@ -80,8 +80,8 @@ protected void cleanup() throws Exception {
private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWithClientBuilder, boolean testWithReaderConfig) throws Exception {
Configuration.Builder configBuidler = Configuration.create()
.with(PulsarDatabaseHistory.TOPIC, topicName)
.with(DatabaseHistory.NAME, "my-db-history")
.with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL);
.with(SchemaHistory.NAME, "my-db-history")
.with(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL);

if (testWithClientBuilder) {
ClientBuilder builder = PulsarClient.builder().serviceUrl(brokerUrl.toString());
Expand All @@ -101,7 +101,7 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWit
}

// Start up the history ...
history.configure(configBuidler.build(), null, DatabaseHistoryListener.NOOP, true);
history.configure(configBuidler.build(), null, SchemaHistoryListener.NOOP, true);
history.start();

// Should be able to call start more than once ...
Expand Down Expand Up @@ -160,7 +160,7 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWit
// Stop the history (which should stop the producer) ...
history.stop();
history = new PulsarDatabaseHistory();
history.configure(configBuidler.build(), null, DatabaseHistoryListener.NOOP, true);
history.configure(configBuidler.build(), null, SchemaHistoryListener.NOOP, true);
// no need to start

// Recover from the very beginning to just past the first change ...
Expand Down Expand Up @@ -240,11 +240,11 @@ public void testExists() throws Exception {
Configuration config = Configuration.create()
.with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString())
.with(PulsarDatabaseHistory.TOPIC, "persistent://my-property/my-ns/dummytopic")
.with(DatabaseHistory.NAME, "my-db-history")
.with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true)
.with(SchemaHistory.NAME, "my-db-history")
.with(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true)
.build();

history.configure(config, null, DatabaseHistoryListener.NOOP, true);
history.configure(config, null, SchemaHistoryListener.NOOP, true);
history.start();

// dummytopic should not exist yet
Expand Down

0 comments on commit 1cde451

Please sign in to comment.