From caedb8f60e9bf6d5698ce65fcec7e5cdab12bd7d Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 4 Nov 2024 09:52:37 +0800 Subject: [PATCH] cdc: make mq encoding protocol related more clear (#18783) (#18986) --- ticdc/ticdc-avro-protocol.md | 87 +++++++++++++++++++++++------------- ticdc/ticdc-sink-to-kafka.md | 13 +++--- 2 files changed, 63 insertions(+), 37 deletions(-) diff --git a/ticdc/ticdc-avro-protocol.md b/ticdc/ticdc-avro-protocol.md index 4a90f30756ef..da4cdc902380 100644 --- a/ticdc/ticdc-avro-protocol.md +++ b/ticdc/ticdc-avro-protocol.md @@ -5,7 +5,9 @@ summary: 了解 TiCDC Avro Protocol 的概念和使用方法。 # TiCDC Avro Protocol -Avro 是由 [Apache Avro™](https://avro.apache.org/) 定义的一种数据交换格式协议,[Confluent Platform](https://docs.confluent.io/platform/current/platform.html) 选择它作为默认的数据交换格式。通过本文,你可以了解 TiCDC 对 Avro 数据格式的实现,包括 TiDB 扩展字段、Avro 数据格式定义,以及和 [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html) 的交互。 +TiCDC Avro 协议,是对 [Confluent Platform](https://docs.confluent.io/platform/current/platform.html) 定义的 [Confluent Avro](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-avro.html) 数据传输格式的第三方实现。Avro 是由 [Apache Avro™](https://avro.apache.org/) 定义的一种数据交换格式。 + +通过本文,你可以了解 TiCDC Avro 数据协议的实现,以及和 [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html) 的交互。 > **警告:** > @@ -13,14 +15,18 @@ Avro 是由 [Apache Avro™](https://avro.apache.org/) 定义的一种数据交 ## 使用 Avro -当使用 Message Queue (MQ) 作为下游 Sink 时,你可以在 `sink-uri` 中指定使用 Avro。TiCDC 获取 TiDB 的 DML 事件,并将这些事件封装到 Avro Message,然后发送到下游。当 Avro 检测到 schema 变化时,会向 Schema Registry 注册最新的 schema。 - 使用 Avro 时的配置样例如下所示: ```shell cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-avro" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml ``` +`--schema-registry` 的值支持 `https` 协议和 `username:password` 认证,username 和 password 必须经过 URL 编码。例如,`--schema-registry=https://username:password@schema-registry-uri.com`。 + +> **注意:** +> +> 使用 Avro 协议时,一个 Kafka Topic 只能包含一张表的数据。你需要在配置文件中配置 [Topic 分发器](/ticdc/ticdc-sink-to-kafka.md#topic-分发器)。 + ```shell [sink] dispatchers = [ @@ -28,29 +34,6 @@ dispatchers = [ ] ``` -`--schema-registry` 的值支持 https 协议和 username:password 认证,比如`--schema-registry=https://username:password@schema-registry-uri.com`,username 和 password 必须经过 URL 编码。 - -## TiDB 扩展字段 - -默认情况下,Avro 只收集在 DML 事件中发生数据变更的行的所有数据信息,不收集数据变更的类型和 TiDB 专有的 CommitTS 事务唯一标识信息。为了解决这个问题,TiCDC 在 Avro 协议格式中附加了 TiDB 扩展字段。当 `sink-uri` 中设置 `enable-tidb-extension` 为 `true` (默认为 `false`)后,TiCDC 生成 Avro 消息时会新增三个字段: - -- `_tidb_op`:DML 的类型,"c" 表示插入,"u" 表示更新。 -- `_tidb_commit_ts`:事务唯一标识信息。 -- `_tidb_commit_physical_time`:事务标识信息中现实时间的时间戳。 - -配置样例如下所示: - -```shell -cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-avro-enable-extension" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro&enable-tidb-extension=true" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml -``` - - ```shell - [sink] - dispatchers = [ - {matcher = ['*.*'], topic = "tidb_{schema}_{table}"}, - ] - ``` - ## 数据格式定义 TiCDC 会将一个 DML 事件转换为一个 kafka 事件,其中事件的 key 和 value 都按照 Avro 协议进行编码。 @@ -89,7 +72,36 @@ Key 中的 `fields` 只包含主键或唯一索引列。 } ``` -Value 数据格式默认与 Key 数据格式相同,但是 Value 的 `fields` 中包含了所有的列,而不仅仅是主键列。 +Value 数据格式默认与 Key 数据格式相同,但是 Value 的 `fields` 中包含了所有的列。 + +> **注意:** +> +> Avro 协议在编码 DML 事件时,操作方式如下: +> +> - 对于 Delete 事件,只编码 Key 部分,Value 部分为空。 +> - 对于 Insert 事件,编码所有列数据到 Value 部分。 +> - 对于 Update 事件,只编码更新后的所有列数据到 Value 部分。 + +## TiDB 扩展字段 + +默认情况下,Avro 只编码在 DML 事件中发生数据变更的行的所有列数据信息,不收集数据变更的类型和 TiDB 专有的 CommitTS 事务唯一标识信息。为了解决这个问题,TiCDC 在 Avro 协议格式中附加了 TiDB 扩展字段。当 `sink-uri` 中设置 `enable-tidb-extension` 为 `true` (默认为 `false`)后,TiCDC 生成 Avro 消息时,会在 Value 部分新增三个字段: + +- `_tidb_op`:DML 的类型,"c" 表示插入,"u" 表示更新。 +- `_tidb_commit_ts`:事务唯一标识信息。 +- `_tidb_commit_physical_time`:事务标识信息中现实时间的时间戳。 + +配置样例如下所示: + +```shell +cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-avro-enable-extension" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro&enable-tidb-extension=true" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml +``` + + ```shell + [sink] + dispatchers = [ + {matcher = ['*.*'], topic = "tidb_{schema}_{table}"}, + ] + ``` 如果开启了 [TiDB 扩展字段](#tidb-扩展字段),那么 Value 数据格式将会变成: @@ -155,7 +167,7 @@ Column 数据格式即 Key/Value 数据格式中的 `{{ColumnValueBlock}}` 部 - `{{ColumnName}}` 表示列名。 - `{{TIDB_TYPE}}` 表示对应到 TiDB 中的类型,与原始的 SQL Type 不是一一对应关系。 -- `{{AVRO_TYPE}}` 表示 [avro spec](https://avro.apache.org/docs/current/spec.html) 中的类型。 +- `{{AVRO_TYPE}}` 表示 [Avro Specification](https://avro.apache.org/docs/++version++/specification) 中的类型。 | SQL TYPE | TIDB_TYPE | AVRO_TYPE | 说明 | |------------|-----------|-----------|---------------------------------------------------------------------------------------------------------------------------| @@ -266,14 +278,27 @@ DECIMAL(10, 4) ## DDL 事件与 Schema 变更 -Avro 并不会向下游生成 DDL 事件。Avro 会在每次 DML 事件发生时检测是否发生 schema 变更,如果发生了 schema 变更,Avro 会生成新的 schema,并尝试向 Schema Registry 注册。注册时,Schema Registry 会做兼容性检测,如果此次 schema 变更没有通过兼容性检测,注册将会失败,TiCDC 并不会尝试解决 schema 的兼容性问题。 +Avro 协议并不会向下游发送 DDL 事件和 Watermark 事件。Avro 会在每次 DML 事件发生时检测是否发生 schema 变更,如果发生了 schema 变更,Avro 会生成新的 schema,并尝试向 Schema Registry 注册。注册时,Schema Registry 会做兼容性检测,如果此次 schema 变更没有通过兼容性检测,注册将会失败,TiCDC 并不会尝试解决 schema 的兼容性问题。 -同时,即使 schema 变更通过兼容性检测并成功注册新版本,数据的生产者和消费者可能仍然需要升级才能正确工作。 +比如,Confluent Schema Registry 默认的[兼容性策略](https://docs.confluent.io/platform/current/schema-registry/fundamentals/schema-evolution.html#compatibility-types)是 `BACKWARD`,在这种策略下,如果你在源表增加一个非空列,Avro 在生成新 schema 向 Schema Registry 注册时将会因为兼容性问题失败,这个时候 changefeed 将会进入 error 状态。 -比如,Confluent Schema Registry 默认的兼容性策略是 BACKWARD,在这种策略下,如果你在源表增加一个非空列,Avro 在生成新 schema 向 Schema Registry 注册时将会因为兼容性问题失败,这个时候 changefeed 将会进入 error 状态。 +同时,即使 schema 变更通过兼容性检测并成功注册新版本,数据的生产者和消费者可能仍然需要获取到新版本的 schema,才能对数据进行正确编解码。 如需了解更多 schema 相关信息,请参阅 [Schema Registry 的相关文档](https://docs.confluent.io/platform/current/schema-registry/avro.html)。 +## 消费者实现 + +TiCDC Avro 协议支持被 [`io.confluent.kafka.serializers.KafkaAvroDeserializer`](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-avro.html#avro-deserializer) 反序列化。 + +消费者程序可以通过 [Schema Registry API](https://docs.confluent.io/platform/current/schema-registry/develop/api.html) 获取到最新的 schema,然后对数据进行反序列化。 + +### 区分事件类型 + +消费者程序可以按照如下规则区分 DML 事件类型: + +* 只有 Key 部分,则是 Delete 事件。 +* 含有 Value 部分,则是 Insert 或 Update 事件。如果用户开启了 [TiDB 扩展字段功能](#tidb-扩展字段),可以根据其中的 `_tidb_op` 字段,判断该条事件变更是 Insert 或 Update。如果未开启 TiDB 扩展字段功能,则无法区分。 + ## Topic 分发 Schema Registry 支持三种 [Subject Name Strategy](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#subject-name-strategy):TopicNameStrategy、RecordNameStrategy 和 TopicRecordNameStrategy。目前 TiCDC Avro 只支持 TopicNameStrategy 一种,这意味着一个 kafka topic 只能接收一种数据格式的数据,所以 TiCDC Avro 禁止将多张表映射到同一个 topic。在创建 changefeed 时,如果配置的分发规则中,topic 规则不包含 `{schema}` 和 `{table}` 占位符,将会报错。 diff --git a/ticdc/ticdc-sink-to-kafka.md b/ticdc/ticdc-sink-to-kafka.md index 62b4c3e022d2..924a529e36d2 100644 --- a/ticdc/ticdc-sink-to-kafka.md +++ b/ticdc/ticdc-sink-to-kafka.md @@ -7,7 +7,7 @@ summary: 了解如何使用 TiCDC 将数据同步到 Kafka。 本文介绍如何使用 TiCDC 创建一个将增量数据复制到 Kafka 的 Changefeed。 -## 创建同步任务,复制增量数据 Kafka +## 创建同步任务,复制增量数据到 Kafka 使用以下命令来创建同步任务: @@ -73,6 +73,7 @@ URI 中可配置的的参数如下: | `127.0.0.1` | 下游 Kafka 对外提供服务的 IP。 | | `9092` | 下游 Kafka 的连接端口。 | | `topic-name` | 变量,使用的 Kafka topic 名字。 | +| `protocol` | 输出到 Kafka 的消息协议。可选值有 [`canal-json`](/ticdc/ticdc-canal-json.md)、[`open-protocol`](/ticdc/ticdc-open-protocol.md)、[`avro`](/ticdc/ticdc-avro-protocol.md)、[`debezium`](/ticdc/ticdc-debezium.md) 和 [`simple`](/ticdc/ticdc-simple-protocol.md)。 | | `kafka-version` | 下游 Kafka 版本号。该值需要与下游 Kafka 的实际版本保持一致。 | | `kafka-client-id` | 指定同步任务的 Kafka 客户端的 ID(可选,默认值为 `TiCDC_sarama_producer_同步任务的 ID`)。 | | `partition-num` | 下游 Kafka partition 数量(可选,不能大于实际 partition 数量,否则创建同步任务会失败,默认值 `3`)。| @@ -80,7 +81,6 @@ URI 中可配置的的参数如下: | `replication-factor` | Kafka 消息保存副本数(可选,默认值 `1`),需要大于等于 Kafka 中 [`min.insync.replicas`](https://kafka.apache.org/33/documentation.html#brokerconfigs_min.insync.replicas) 的值。 | | `required-acks` | 在 `Produce` 请求中使用的配置项,用于告知 broker 需要收到多少副本确认后才进行响应。可选值有:`0`(`NoResponse`:不发送任何响应,只有 TCP ACK),`1`(`WaitForLocal`:仅等待本地提交成功后再响应)和 `-1`(`WaitForAll`:等待所有同步副本提交后再响应。最小同步副本数量可通过 broker 的 [`min.insync.replicas`](https://kafka.apache.org/33/documentation.html#brokerconfigs_min.insync.replicas) 配置项进行配置)。(可选,默认值为 `-1`)。 | | `compression` | 设置发送消息时使用的压缩算法(可选值为 `none`、`lz4`、`gzip`、`snappy` 和 `zstd`,默认值为 `none`)。注意 Snappy 压缩文件必须遵循[官方 Snappy 格式](https://github.com/google/snappy)。不支持其他非官方压缩格式。| -| `protocol` | 输出到 Kafka 的消息协议,可选值有 `canal-json`、`open-protocol`、`avro`、`debezium` 和 `simple`。 | | `auto-create-topic` | 当传入的 `topic-name` 在 Kafka 集群不存在时,TiCDC 是否要自动创建该 topic(可选,默认值 `true`)。 | | `enable-tidb-extension` | 可选,默认值是 `false`。当输出协议为 `canal-json` 时,如果该值为 `true`,TiCDC 会发送 [WATERMARK 事件](/ticdc/ticdc-canal-json.md#watermark-event),并在 Kafka 消息中添加 TiDB 扩展字段。从 6.1.0 开始,该参数也可以和输出协议 `avro` 一起使用。如果该值为 `true`,TiCDC 会在 Kafka 消息中添加[三个 TiDB 扩展字段](/ticdc/ticdc-avro-protocol.md#tidb-扩展字段)。| | `max-batch-size` | 从 v4.0.9 开始引入。当消息协议支持把多条变更记录输出至一条 Kafka 消息时,该参数用于指定这一条 Kafka 消息中变更记录的最多数量。目前,仅当 Kafka 消息的 `protocol` 为 `open-protocol` 时有效(可选,默认值 `16`)。| @@ -109,12 +109,13 @@ URI 中可配置的的参数如下: ### 最佳实践 * TiCDC 推荐用户自行创建 Kafka Topic,你至少需要设置该 Topic 每次向 Kafka broker 发送消息的最大数据量和下游 Kafka partition 的数量。在创建 changefeed 的时候,这两项设置分别对应 `max-message-bytes` 和 `partition-num` 参数。 -* 如果你在创建 changefeed 时,使用了尚未存在的 Topic,那么 TiCDC 会尝试使用 `partition-num` 和 `replication-factor` 参数自行创建 Topic。建议明确指定这两个参数。 +* 如果你在创建 changefeed 时,使用了尚未存在的 Topic,那么 TiCDC 会尝试使用 `partition-num` 和 `replication-factor` 参数自行创建 Topic,建议明确指定这两个参数。 * 在大多数情况下,建议使用 `canal-json` 协议。 > **注意:** > -> 当 `protocol` 为 `open-protocol` 时,TiCDC 会尽量避免产生长度超过 `max-message-bytes` 的消息。但如果单条数据变更记录需要超过 `max-message-bytes` 个字节来表示,为了避免静默失败,TiCDC 会试图输出这条消息并在日志中输出 Warning。 +> 当 `protocol` 为 `open-protocol` 时,TiCDC 会将多个事件编码到同一个 Kafka 消息中,并尽量避免在此过程中生成长度超过 `max-message-bytes` 的消息。 +> 如果单条数据变更编码得到的消息大小超过了 `max-message-bytes` 个字节,changefeed 会报错,并打印错误日志。 ### TiCDC 使用 Kafka 的认证与授权 @@ -172,7 +173,7 @@ URI 中可配置的的参数如下: ### TiCDC 集成 Kafka Connect (Confluent Platform) -如要使用 Confluent 提供的 [data connectors](https://docs.confluent.io/current/connect/managing/connectors.html) 向关系型或非关系型数据库传输数据,请选择 `avro` 协议,并在 `schema-registry` 中提供 [Confluent Schema Registry](https://www.confluent.io/product/confluent-platform/data-compatibility/) 的 URL。 +如要使用 Confluent 提供的 [data connectors](https://docs.confluent.io/current/connect/managing/connectors.html) 向关系型或非关系型数据库传输数据,请选择 [Avro 协议](/ticdc/ticdc-avro-protocol.md)协议,并在 `schema-registry` 中提供 [Confluent Schema Registry](https://www.confluent.io/product/confluent-platform/data-compatibility/) 的 URL。 配置样例如下所示: @@ -191,7 +192,7 @@ dispatchers = [ ### TiCDC 集成 AWS Glue Schema Registry -从 v7.4.0 开始,TiCDC 支持在用户选择 Avro 协议同步数据时使用 [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) 作为 Schema Registry。配置样例如下所示: +从 v7.4.0 开始,TiCDC 支持在用户选择 [Avro 协议](/ticdc/ticdc-avro-protocol.md)同步数据时使用 [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) 作为 Schema Registry。配置样例如下所示: ```shell ./cdc cli changefeed create --server=127.0.0.1:8300 --changefeed-id="kafka-glue-test" --sink-uri="kafka://127.0.0.1:9092/topic-name?&protocol=avro&replication-factor=3" --config changefeed_glue.toml