Skip to content

Commit

Permalink
cdc: update description about partition dispatcher and add column sel… (
Browse files Browse the repository at this point in the history
  • Loading branch information
Oreoxmt authored Nov 13, 2023
1 parent a8b8f76 commit 2fb9f23
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 13 deletions.
18 changes: 14 additions & 4 deletions ticdc/ticdc-changefeed-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,20 @@ enable-table-across-nodes = false
# Note: When the downstream MQ is Pulsar, if the routing rule for `partition` is not specified as any of `ts`, `index-value`, `table`, or `default`, each Pulsar message will be routed using the string you set as the key.
# For example, if you specify the routing rule for a matcher as the string `code`, then all Pulsar messages that match that matcher will be routed with `code` as the key.
# dispatchers = [
# {matcher = ['test1.*', 'test2.*'], topic = "Topic expression 1", partition = "ts" },
# {matcher = ['test3.*', 'test4.*'], topic = "Topic expression 2", partition = "index-value" },
# {matcher = ['test1.*', 'test5.*'], topic = "Topic expression 3", partition = "table"},
# {matcher = ['test6.*'], partition = "ts"}
# {matcher = ['test1.*', 'test2.*'], topic = "Topic expression 1", partition = "index-value"},
# {matcher = ['test3.*', 'test4.*'], topic = "Topic expression 2", partition = "index-value", index-name="index1"},
# {matcher = ['test1.*', 'test5.*'], topic = "Topic expression 3", partition = "table"},
# {matcher = ['test6.*'], partition = "columns", columns = "['a', 'b']"}
# {matcher = ['test7.*'], partition = "ts"}
# ]
# column-selectors is introduced in v7.5.0 and only takes effect when the downstream is Kafka.
# column-selectors is used to select specific columns for replication.
# column-selectors = [
# {matcher = ['test.t1'], columns = ['a', 'b']},
# {matcher = ['test.*'], columns = ["*", "!b"]},
# {matcher = ['test1.t1'], columns = ['column*', '!column1']},
# {matcher = ['test3.t'], columns = ["column?", "!column1"]},
# ]
# The protocol configuration item specifies the protocol format used for encoding messages.
Expand Down
64 changes: 55 additions & 9 deletions ticdc/ticdc-sink-to-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,32 +222,78 @@ For example, for a dispatcher like `matcher = ['test.*'], topic = {schema}_{tabl

### Partition dispatchers

You can use `partition = "xxx"` to specify a partition dispatcher. It supports four dispatchers: `default`, `ts`, `index-value`, and `table`. The dispatcher rules are as follows:
You can use `partition = "xxx"` to specify a partition dispatcher. It supports five dispatchers: `default`, `index-value`, `columns`, `table`, and `ts`. The dispatcher rules are as follows:

- `default`: dispatches events in the `table` mode.
- `ts`: uses the commitTs of the row change to hash and dispatch events.
- `index-value`: uses the value of the primary key or the unique index of the table to hash and dispatch events.
- `table`: uses the schema name of the table and the table name to hash and dispatch events.
- `default`: uses the `table` dispatcher rule by default. It calculates the partition number using the schema name and table name, ensuring data from a table is sent to the same partition. As a result, the data from a single table only exists in one partition and is guaranteed to be ordered. However, this dispatcher rule limits the send throughput, and the consumption speed cannot be improved by adding consumers.
- `index-value`: calculates the partition number using either the primary key, a unique index, or an explicitly specified index, distributing table data across multiple partitions. The data from a single table is sent to multiple partitions, and the data in each partition is ordered. You can improve the consumption speed by adding consumers.
- `columns`: calculates the partition number using the values of explicitly specified columns, distributing table data across multiple partitions. The data from a single table is sent to multiple partitions, and the data in each partition is ordered. You can improve the consumption speed by adding consumers.
- `table`: calculates the partition number using the schema name and table name.
- `ts`: calculates the partition number using the commitTs of the row change, distributing table data across multiple partitions. The data from a single table is sent to multiple partitions, and the data in each partition is ordered. You can improve the consumption speed by adding consumers. However, multiple changes of a data item might be sent to different partitions and the consumer progress of different consumers might be different, which might cause data inconsistency. Therefore, the consumer needs to sort the data from multiple partitions by commitTs before consuming.

Take the following configuration of `dispatchers` as an example:

```toml
[sink]
dispatchers = [
{matcher = ['test.*'], partition = "index-value"},
{matcher = ['test1.*'], partition = "index-value", index-name = "index1"},
{matcher = ['test2.*'], partition = "columns", columns = ["id", "a"]},
{matcher = ['test3.*'], partition = "table"},
]
```

- Tables in the `test` database use the `index-value` dispatcher, which calculates the partition number using the value of the primary key or unique index. If a primary key exists, the primary key is used; otherwise, the shortest unique index is used.
- Tables in the `test1` table use the `index-value` dispatcher and calculate the partition number using values of all columns in the index named `index1`. If the specified index does not exist, an error is reported. Note that the index specified by `index-name` must be a unique index.
- Tables in the `test2` database use the `columns` dispatcher and calculate the partition number using the values of columns `id` and `a`. If any of the columns does not exist, an error is reported.
- Tables in the `test3` database use the `table` dispatcher.
- Tables in the `test4` database use the `default` dispatcher, that is the `table` dispatcher, as they do not match any of the preceding rules.

If a table matches multiple dispatcher rules, the first matching rule takes precedence.

> **Note:**
>
>
> Since v6.1.0, to clarify the meaning of the configuration, the configuration used to specify the partition dispatcher has been changed from `dispatcher` to `partition`, with `partition` being an alias for `dispatcher`. For example, the following two rules are exactly equivalent.
>
> ```
> [sink]
> dispatchers = [
> {matcher = ['*.*'], dispatcher = "ts"},
> {matcher = ['*.*'], partition = "ts"},
> {matcher = ['*.*'], dispatcher = "index-value"},
> {matcher = ['*.*'], partition = "index-value"},
> ]
> ```
>
> However, `dispatcher` and `partition` cannot appear in the same rule. For example, the following rule is invalid.
>
> ```
> {matcher = ['*.*'], dispatcher = "ts", partition = "table"},
> {matcher = ['*.*'], dispatcher = "index-value", partition = "table"},
> ```
## Column selectors
The column selector feature supports selecting columns from events and sending only the data changes related to those columns to the downstream.
Take the following configuration of `column-selectors` as an example:
```toml
[sink]
column-selectors = [
{matcher = ['test.t1'], columns = ['a', 'b']},
{matcher = ['test.*'], columns = ["*", "!b"]},
{matcher = ['test1.t1'], columns = ['column*', '!column1']},
{matcher = ['test3.t'], columns = ["column?", "!column1"]},
]
```
- For table `test.t1`, only columns `a` and `b` are sent.
- For tables in the `test` database (excluding the `t1` table), all columns except `b` are sent.
- For table `test1.t1`, any column starting with `column` is sent, except for `column1`.
- For table `test3.t`, any 7-character column starting with `column` is sent, except for `column1`.
- For tables that do not match any rule, all columns are sent.

> **Note:**
>
> After being filtered by the `column-selectors` rules, the data in the table must have a primary key or unique key to be replicated. Otherwise, the changefeed reports an error when it is created or running.
## Scale out the load of a single large table to multiple TiCDC nodes

This feature splits the data replication range of a single large table into multiple ranges, according to the data volume and the number of modified rows per minute, and it makes the data volume and the number of modified rows replicated in each range approximately the same. This feature distributes these ranges to multiple TiCDC nodes for replication, so that multiple TiCDC nodes can replicate a large single table at the same time. This feature can solve the following two problems:
Expand Down

0 comments on commit 2fb9f23

Please sign in to comment.