diff --git a/ticdc/ticdc-changefeed-config.md b/ticdc/ticdc-changefeed-config.md index 119cb574c2b7e..9c9e26d607989 100644 --- a/ticdc/ticdc-changefeed-config.md +++ b/ticdc/ticdc-changefeed-config.md @@ -121,10 +121,20 @@ enable-table-across-nodes = false # Note: When the downstream MQ is Pulsar, if the routing rule for `partition` is not specified as any of `ts`, `index-value`, `table`, or `default`, each Pulsar message will be routed using the string you set as the key. # For example, if you specify the routing rule for a matcher as the string `code`, then all Pulsar messages that match that matcher will be routed with `code` as the key. # dispatchers = [ -# {matcher = ['test1.*', 'test2.*'], topic = "Topic expression 1", partition = "ts" }, -# {matcher = ['test3.*', 'test4.*'], topic = "Topic expression 2", partition = "index-value" }, -# {matcher = ['test1.*', 'test5.*'], topic = "Topic expression 3", partition = "table"}, -# {matcher = ['test6.*'], partition = "ts"} +# {matcher = ['test1.*', 'test2.*'], topic = "Topic expression 1", partition = "index-value"}, +# {matcher = ['test3.*', 'test4.*'], topic = "Topic expression 2", partition = "index-value", index-name="index1"}, +# {matcher = ['test1.*', 'test5.*'], topic = "Topic expression 3", partition = "table"}, +# {matcher = ['test6.*'], partition = "columns", columns = "['a', 'b']"} +# {matcher = ['test7.*'], partition = "ts"} +# ] + +# column-selectors is introduced in v7.5.0 and only takes effect when the downstream is Kafka. +# column-selectors is used to select specific columns for replication. +# column-selectors = [ +# {matcher = ['test.t1'], columns = ['a', 'b']}, +# {matcher = ['test.*'], columns = ["*", "!b"]}, +# {matcher = ['test1.t1'], columns = ['column*', '!column1']}, +# {matcher = ['test3.t'], columns = ["column?", "!column1"]}, # ] # The protocol configuration item specifies the protocol format used for encoding messages. diff --git a/ticdc/ticdc-sink-to-kafka.md b/ticdc/ticdc-sink-to-kafka.md index d22f418e72fcc..ba5f5d30602dc 100644 --- a/ticdc/ticdc-sink-to-kafka.md +++ b/ticdc/ticdc-sink-to-kafka.md @@ -222,32 +222,78 @@ For example, for a dispatcher like `matcher = ['test.*'], topic = {schema}_{tabl ### Partition dispatchers -You can use `partition = "xxx"` to specify a partition dispatcher. It supports four dispatchers: `default`, `ts`, `index-value`, and `table`. The dispatcher rules are as follows: +You can use `partition = "xxx"` to specify a partition dispatcher. It supports five dispatchers: `default`, `index-value`, `columns`, `table`, and `ts`. The dispatcher rules are as follows: -- `default`: dispatches events in the `table` mode. -- `ts`: uses the commitTs of the row change to hash and dispatch events. -- `index-value`: uses the value of the primary key or the unique index of the table to hash and dispatch events. -- `table`: uses the schema name of the table and the table name to hash and dispatch events. +- `default`: uses the `table` dispatcher rule by default. It calculates the partition number using the schema name and table name, ensuring data from a table is sent to the same partition. As a result, the data from a single table only exists in one partition and is guaranteed to be ordered. However, this dispatcher rule limits the send throughput, and the consumption speed cannot be improved by adding consumers. +- `index-value`: calculates the partition number using either the primary key, a unique index, or an explicitly specified index, distributing table data across multiple partitions. The data from a single table is sent to multiple partitions, and the data in each partition is ordered. You can improve the consumption speed by adding consumers. +- `columns`: calculates the partition number using the values of explicitly specified columns, distributing table data across multiple partitions. The data from a single table is sent to multiple partitions, and the data in each partition is ordered. You can improve the consumption speed by adding consumers. +- `table`: calculates the partition number using the schema name and table name. +- `ts`: calculates the partition number using the commitTs of the row change, distributing table data across multiple partitions. The data from a single table is sent to multiple partitions, and the data in each partition is ordered. You can improve the consumption speed by adding consumers. However, multiple changes of a data item might be sent to different partitions and the consumer progress of different consumers might be different, which might cause data inconsistency. Therefore, the consumer needs to sort the data from multiple partitions by commitTs before consuming. + +Take the following configuration of `dispatchers` as an example: + +```toml +[sink] +dispatchers = [ + {matcher = ['test.*'], partition = "index-value"}, + {matcher = ['test1.*'], partition = "index-value", index-name = "index1"}, + {matcher = ['test2.*'], partition = "columns", columns = ["id", "a"]}, + {matcher = ['test3.*'], partition = "table"}, +] +``` + +- Tables in the `test` database use the `index-value` dispatcher, which calculates the partition number using the value of the primary key or unique index. If a primary key exists, the primary key is used; otherwise, the shortest unique index is used. +- Tables in the `test1` table use the `index-value` dispatcher and calculate the partition number using values of all columns in the index named `index1`. If the specified index does not exist, an error is reported. Note that the index specified by `index-name` must be a unique index. +- Tables in the `test2` database use the `columns` dispatcher and calculate the partition number using the values of columns `id` and `a`. If any of the columns does not exist, an error is reported. +- Tables in the `test3` database use the `table` dispatcher. +- Tables in the `test4` database use the `default` dispatcher, that is the `table` dispatcher, as they do not match any of the preceding rules. + +If a table matches multiple dispatcher rules, the first matching rule takes precedence. > **Note:** > -> > Since v6.1.0, to clarify the meaning of the configuration, the configuration used to specify the partition dispatcher has been changed from `dispatcher` to `partition`, with `partition` being an alias for `dispatcher`. For example, the following two rules are exactly equivalent. > > ``` > [sink] > dispatchers = [ -> {matcher = ['*.*'], dispatcher = "ts"}, -> {matcher = ['*.*'], partition = "ts"}, +> {matcher = ['*.*'], dispatcher = "index-value"}, +> {matcher = ['*.*'], partition = "index-value"}, > ] > ``` > > However, `dispatcher` and `partition` cannot appear in the same rule. For example, the following rule is invalid. > > ``` -> {matcher = ['*.*'], dispatcher = "ts", partition = "table"}, +> {matcher = ['*.*'], dispatcher = "index-value", partition = "table"}, > ``` +## Column selectors + +The column selector feature supports selecting columns from events and sending only the data changes related to those columns to the downstream. + +Take the following configuration of `column-selectors` as an example: + +```toml +[sink] +column-selectors = [ + {matcher = ['test.t1'], columns = ['a', 'b']}, + {matcher = ['test.*'], columns = ["*", "!b"]}, + {matcher = ['test1.t1'], columns = ['column*', '!column1']}, + {matcher = ['test3.t'], columns = ["column?", "!column1"]}, +] +``` + +- For table `test.t1`, only columns `a` and `b` are sent. +- For tables in the `test` database (excluding the `t1` table), all columns except `b` are sent. +- For table `test1.t1`, any column starting with `column` is sent, except for `column1`. +- For table `test3.t`, any 7-character column starting with `column` is sent, except for `column1`. +- For tables that do not match any rule, all columns are sent. + +> **Note:** +> +> After being filtered by the `column-selectors` rules, the data in the table must have a primary key or unique key to be replicated. Otherwise, the changefeed reports an error when it is created or running. + ## Scale out the load of a single large table to multiple TiCDC nodes This feature splits the data replication range of a single large table into multiple ranges, according to the data volume and the number of modified rows per minute, and it makes the data volume and the number of modified rows replicated in each range approximately the same. This feature distributes these ranges to multiple TiCDC nodes for replication, so that multiple TiCDC nodes can replicate a large single table at the same time. This feature can solve the following two problems: