This demo automatically deploys the topology of services as defined in the Debezium Tutorial.
- Debezium Tutorial
# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.4
docker-compose -f docker-compose-mysql.yaml up
# Start MySQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
# Consume messages from a Debezium topic
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.customers
# Modify records in the database via MySQL client
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
# Shut down the cluster
docker-compose -f docker-compose-mysql.yaml down
To use Avro-style messages instead of JSON, Avro can be configured one of two ways, in the Kafka Connect worker configuration or in the connector configuration. Using Avro in conjunction with the schema registry allows for much more compact messages.
Configuring Avro at the Kafka Connect worker involves using the same steps above for MySQL but instead using the docker-compose-mysql-avro-worker.yaml configuration file instead. The Compose file configures the Connect service to use the Avro (de-)serializers for the Connect instance and starts one more additional service, the Confluent schema registry.
Configuring Avro at the Debezium Connector involves specifying the converter and schema registry as a part of the connectors configuration. To do this, follow the same steps above for MySQL but instead using the docker-compose-mysql-avro-connector.yaml and register-mysql-avro.json configuration files. The Compose file configures the Connect service to use the default (de-)serializers for the Connect instance and starts one additional service, the Confluent schema registry. The connector configuration file configures the connector but explicitly sets the (de-)serializers for the connector to use Avro and specifies the location of the schema registry.
You can access the first version of the schema for customers
values like so:
curl -X GET http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/1
Or, if you have the jq
utility installed, you can get a formatted output like this:
curl -X GET http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/1 | jq '.schema | fromjson'
If you alter the structure of the customers
table in the database and trigger another change event,
a new version of that schema will be available in the registry.
The service registry also comes with a console consumer that can read the Avro messages:
docker-compose -f docker-compose-mysql-avro-worker.yaml exec schema-registry /usr/bin/kafka-avro-console-consumer \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--property schema.registry.url=http://schema-registry:8081 \
--topic dbserver1.inventory.customers
Apicurio Registry is an open-source API and schema registry that amongst other things can be used to store schemas of Kafka records. It provides
- its own native Avro converter and Protobuf serializer
- a JSON converter that exports its schema into the registry
- a compatibility layer with other schema registries such as IBM's or Confluent's; it can be used with the Confluent Avro converter.
For the Apicurio examples we will use the following deployment topology:
Configuring JSON converter with externalized schema at the Debezium Connector involves specifying the converter and schema registry as a part of the connectors configuration. To do this, follow the same steps above for MySQL but instead using the docker-compose-mysql-apicurio.yaml and register-mysql-apicurio-converter-json.json configuration files. The Compose file configures the Connect service to use the default (de-)serializers for the Connect instance and starts one additional service, the Apicurio Registry. The connector configuration file configures the connector but explicitly sets the (de-)serializers for the connector to use Avro and specifies the location of the Apicurio registry.
You can access the first version of the schema for customers
values like so:
curl -X GET http://localhost:8080/api/artifacts/dbserver1.inventory.customers-value
Or, if you have the jq
utility installed, you can get a formatted output like this:
curl -X GET http://localhost:8080/api/artifacts/dbserver1.inventory.customers-value | jq .
If you alter the structure of the customers
table in the database and trigger another change event,
a new version of that schema will be available in the registry.
You can consume the JSON messages in the same way as with standard JSON converter
docker-compose -f docker-compose-mysql-apicurio.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.customers
When you look at the data message you will notice that it contains only payload
but not schema
part as this is externalized into the registry.
Configuring Avro at the Debezium Connector involves specifying the converter and schema registry as a part of the connectors configuration. To do this, follow the same steps above for MySQL but instead using the docker-compose-mysql-apicurio.yaml and register-mysql-apicurio-converter-avro.json configuration files. The Compose file configures the Connect service to use the default (de-)serializers for the Connect instance and starts one additional service, the Apicurio Registry. The connector configuration file configures the connector but explicitly sets the (de-)serializers for the connector to use Avro and specifies the location of the Apicurio registry.
You can access the first version of the schema for customers
values like so:
curl -X GET http://localhost:8080/api/artifacts/dbserver1.inventory.customers-value
Or, if you have the jq
utility installed, you can get a formatted output like this:
curl -X GET http://localhost:8080/api/artifacts/dbserver1.inventory.customers-value | jq .
If you alter the structure of the customers
table in the database and trigger another change event,
a new version of that schema will be available in the registry.
Configuring Avro at the Debezium Connector involves specifying the converter and schema registry as a part of the connectors configuration. To do this, follow the same steps above for MySQL but instead using the docker-compose-mysql-apicurio.yaml and register-mysql-apicurio.json configuration files. The Compose file configures the Connect service to use the default (de-)serializers for the Connect instance and starts one additional service, the Apicurio Registry. The connector configuration file configures the connector but explicitly sets the (de-)serializers for the connector to use Avro and specifies the location of the Apicurio registry.
You can access the first version of the schema for customers
values like so:
curl -X GET http://localhost:8080/api/ccompat/subjects/dbserver1.inventory.customers-value/versions/1
Or, if you have the jq
utility installed, you can get a formatted output like this:
curl -X GET http://localhost:8080/api/ccompat/subjects/dbserver1.inventory.customers-value/versions/1 | jq '.schema | fromjson'
If you alter the structure of the customers
table in the database and trigger another change event,
a new version of that schema will be available in the registry.
To consume the Avro messages It is possible to use kafkacat
tool:
docker run --rm --tty \
--network tutorial_default \
debezium/tooling \
kafkacat -b kafka:9092 -C -o beginning -q -s value=avro -r http://apicurio:8080/api/ccompat \
-t dbserver1.inventory.customers | jq .
# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.4
docker-compose -f docker-compose-postgres.yaml up
# Start Postgres connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json
# Consume messages from a Debezium topic
docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.customers
# Modify records in the database via Postgres client
docker-compose -f docker-compose-postgres.yaml exec postgres env PGOPTIONS="--search_path=inventory" bash -c 'psql -U $POSTGRES_USER postgres'
# Shut down the cluster
docker-compose -f docker-compose-postgres.yaml down
# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.4
docker-compose -f docker-compose-mongodb.yaml up
# Initialize MongoDB replica set and insert some test data
docker-compose -f docker-compose-mongodb.yaml exec mongodb bash -c '/usr/local/bin/init-inventory.sh'
# Start MongoDB connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mongodb.json
# Consume messages from a Debezium topic
docker-compose -f docker-compose-mongodb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.customers
# Modify records in the database via MongoDB client
docker-compose -f docker-compose-mongodb.yaml exec mongodb bash -c 'mongo -u $MONGODB_USER -p $MONGODB_PASSWORD --authenticationDatabase admin inventory'
db.customers.insert([
{ _id : NumberLong("1005"), first_name : 'Bob', last_name : 'Hopper', email : '[email protected]', unique_id : UUID() }
]);
# Shut down the cluster
docker-compose -f docker-compose-mongodb.yaml down
This assumes Oracle is running on localhost (or is reachable there, e.g. by means of running it within a VM or Docker container with appropriate port configurations) and set up with the configuration, users and grants described in the Debezium Vagrant set-up. Note that the connector is using the XStream API, which requires a license for the Golden Gate product (which itself is not required be installed, though).
Also you must download the Oracle instant client for Linux and put it under the directory debezium-with-oracle-jdbc/oracle_instantclient.
# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.4
docker-compose -f docker-compose-oracle.yaml up --build
# Insert test data
cat debezium-with-oracle-jdbc/init/inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
Adjust the host name of the database server and the name of the XStream outbound server in register-oracle.json
as per your environment.
Then register the Debezium Oracle connector:
# Start Oracle connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-oracle.json
# Create a test change record
echo "INSERT INTO customers VALUES (NULL, 'John', 'Doe', '[email protected]');" | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
# Consume messages from a Debezium topic
docker-compose -f docker-compose-oracle.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic server1.DEBEZIUM.CUSTOMERS
# Modify other records in the database via Oracle client
docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
# Shut down the cluster
docker-compose -f docker-compose-oracle.yaml down
# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.4
docker-compose -f docker-compose-sqlserver.yaml up
# Initialize database and insert test data
cat debezium-sqlserver-init/inventory.sql | docker exec -i tutorial_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
# Start SQL Server connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
# Consume messages from a Debezium topic
docker-compose -f docker-compose-sqlserver.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic server1.dbo.customers
# Modify records in the database via SQL Server client (do not forget to add `GO` command to execute the statement)
docker-compose -f docker-compose-sqlserver.yaml exec sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD -d testDB'
# Shut down the cluster
docker-compose -f docker-compose-sqlserver.yaml down
# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.4
docker-compose -f docker-compose-db2.yaml up --build
# Start DB2 connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-db2.json
# Consume messages from a Debezium topic
docker-compose -f docker-compose-db2.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic db2server.DB2INST1.CUSTOMERS
# Modify records in the database via DB2 client
docker-compose -f docker-compose-db2.yaml exec db2server bash -c 'su - db2inst1'
db2 connect to TESTDB
db2 "INSERT INTO DB2INST1.CUSTOMERS(first_name, last_name, email) VALUES ('John', 'Doe', '[email protected]');"
# Shut down the cluster
docker-compose -f docker-compose-db2.yaml down
# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.4
docker-compose -f docker-compose-cassandra.yaml up --build
# Consume messages from a Debezium topic
docker-compose -f docker-compose-cassandra.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic server1.dbo.customers
# Modify records in the database via Cassandra client (note the TX logs will only be flushed out,
and thus be picked up by the connector, after accumulating 1 MB of changes)
docker-compose -f docker-compose-cassandra.yaml exec cassandra bash -c 'cqlsh --keyspace=testdb'
INSERT INTO customers(id,first_name,last_name,email) VALUES (5,'Roger','Poor','[email protected]');
UPDATE customers set first_name = 'Barry' where id = 5;
DELETE FROM customers WHERE id = 5;
# Shut down the cluster
docker-compose -f docker-compose-cassandra.yaml down
# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.4
docker-compose -f docker-compose-vitess.yaml up --build
# Start Vitess connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-vitess.json
# Consume messages from a Debezium topic
docker-compose -f docker-compose-vitess.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.products
# Modify records in the database via MySQL client
docker-compose -f docker-compose-vitess.yaml exec vitess bash -c 'mysql -h 127.0.0.1 -P 15306 inventory'
INSERT INTO products (name, description, weight) VALUES ('Debezium in Action', 'Book', 10);
UPDATE products SET description = 'Video' WHERE id = 1000;
DELETE FROM products WHERE id = 1000;
# Shut down the cluster
docker-compose -f docker-compose-vitess.yaml down
In this example, the Vitess setup has 2 keyspaces (a.k.a. schemas in MySQL's term):
- 1 unsharded keyspace:
customer
, which contains 1 tablecustomer
, and some technical Sequence tables used by Vitess itself. - 1 sharded keyspace:
inventory
, which contains 3 tablesproducts
,products_on_hand
andorders
. This sharded keyspace has 2 shards (-80
and80-
), the sharding key is the product id column of each of the 3 tables, this is, theid
column in theproducts
table, theproduct_id
column in theproducts_on_hand
table, theproduct_id
column in theorders
table.
The unsharded keyspace customer
has only 1 shard, the sharded keyspace inventory
has 2 shards. Each shard consists of their own MySQL cluster that has 3 MySQL processes: 1 master and 2 replicas. Vitess uses 1 instance of vttablet
as the sidecar for each MySQL process.
There are 3 other Vitess processes running in the same container: vtgate
, vtctld
and etcd
. Typically, MySQL clients (e.g. JDBC) send queries to vtgate
, who routes queries to vttablets
, who in turn run the query in their local MySQL instance. vtctld
is a long-running process for admin operations such as adding new keyspaces. Keyspaces' metadata (a.k.a topology) is stored in etcd
. vtgate
is stateless and reads the topology from etcd
and cache it locally.
In summary, the following diagram shows the Vitess container's sharding setup and all the processes running inside the Vitess container:
If you want to experiment with taking down individual vitess component (e.g. vtgate
), or taking down any MySQL process, or even a single shard, you need to get a shell within the Vitess container and kill the corresponding process(es). For example:
# Get a shell within the Vitess container
docker exec -it tutorial_vitess_1 bash
# List all the processes in the Vitess container
ps -ef
# Find and kill the vtgate process
ps -ef | grep vtgate
kill -9 <vtgate_process_id>
# Find and kill all the processes of the -80 shard
for pid in $(ps -ef | awk '/00000002/ {print $2}'); do kill -9 $pid; done
Kafka Connect allows externalization of secrets into a separate configuration repository. The configuration is done at both worker and connector level.
# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.4
docker-compose -f docker-compose-mysql-ext-secrets.yml up
# Start MySQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql-ext-secrets.json
# Check plugin configuration to see that secrets are not visible
curl -s http://localhost:8083/connectors/inventory-connector/config | jq .
# Shut down the cluster
docker-compose -f docker-compose-mysql-ext-secrets.yml down
Should you need to establish a remote debugging session into a deployed connector, add the following to the environment
section of the connect
in the Compose file service:
- KAFKA_DEBUG=true
- DEBUG_SUSPEND_FLAG=n
- JAVA_DEBUG_PORT=*:5005
Also expose the debugging port 5005 under ports
:
- 5005:5005
You can then establish a remote debugging session from your IDE on localhost:5005.