Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36794] [cdc-composer/cli] pipeline cdc connector support multiple data sources #3844

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量
</table>
</div>

## 示例
## 单数据源示例

MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:
单数据源,从单个 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:

```yaml
source:
Expand All @@ -77,6 +77,32 @@ pipeline:
parallelism: 4
```

## 多数据源示例

单数据源,从多个 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:

```yaml
source:
type: mysql_mutiple
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use a new key like 'sources' to describe multiple sources? The '_multiple' suffix in value seems a bit odd. Because the YAML content does not correspond one-to-one with the PipelineDef.

name: MySQL mutiple Source
host_list: 127.0.0.1:3306,127.0.0.2:3307
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404

sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: pass

pipeline:
name: MySQL to Doris Pipeline
parallelism: 4
```

## 连接器配置项

<div class="highlight">
Expand Down
30 changes: 28 additions & 2 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ You may need to configure the following dependencies manually, and pass it with
</table>
</div>

## Example
## single data source Example

An example of the pipeline for reading data from MySQL and sink to Doris can be defined as follows:
An example of the pipeline for reading data from single MySQL and sink to Doris can be defined as follows:

```yaml
source:
Expand All @@ -78,6 +78,32 @@ pipeline:
parallelism: 4
```

## mutiple data source Example

An example of the pipeline for reading data from mutiple MySQL and sink to Doris can be defined as follows:

```yaml
source:
type: mysql_mutiple
name: MySQL mutiple Source
host_list: 127.0.0.1:3306,127.0.0.2:3307
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404

sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: pass

pipeline:
name: MySQL to Doris Pipeline
parallelism: 4
```

## Connector Options

<div class="highlight">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {

public static final String TRANSFORM_TABLE_OPTION_KEY = "table-options";

private static final String HOST_LIST = "host_list";
private static final String COMMA = ",";
private static final String HOST_NAME = "hostname";
private static final String PORT = "port";
private static final String COLON = ":";
private static final String MUTIPLE = "_mutiple";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be _multiple.


private final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());

/** Parse the specified pipeline definition file. */
Expand Down Expand Up @@ -135,13 +142,35 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
SchemaChangeBehavior schemaChangeBehavior =
userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR);

// JsonNode root = mapper.readTree(pipelineDefPath.toFile());
JsonNode sourceNode = pipelineDefJsonNode.get(SOURCE_KEY);
JsonNode hostList = sourceNode.get(HOST_LIST);
String type = sourceNode.get(TYPE_KEY).asText();
List<SourceDef> sourceDefs = new ArrayList<>();
if (hostList != null && type.contains(MUTIPLE)) {
String hostString = hostList.asText();
String[] hosts = hostString.split(COMMA);
Arrays.stream(hosts)
.forEach(
e -> {
((ObjectNode) sourceNode)
.put(TYPE_KEY, type.substring(0, type.indexOf("_")));
((ObjectNode) sourceNode).put(HOST_NAME, e.split(COLON)[0]);
((ObjectNode) sourceNode).put(PORT, e.split(COLON)[1]);
((ObjectNode) sourceNode).remove(HOST_LIST);
getSourceDefs(sourceNode, sourceDefs);
});
} else {
// Source is required
getSourceDefs(sourceNode, sourceDefs);
}
// Source is required
SourceDef sourceDef =
toSourceDef(
checkNotNull(
pipelineDefJsonNode.get(SOURCE_KEY),
"Missing required field \"%s\" in pipeline definition",
SOURCE_KEY));
// SourceDef sourceDef =
// toSourceDef(
// checkNotNull(
// pipelineDefJsonNode.get(SOURCE_KEY),
// "Missing required field \"%s\" in pipeline definition",
// SOURCE_KEY));

// Sink is required
SinkDef sinkDef =
Expand Down Expand Up @@ -171,7 +200,17 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
pipelineConfig.addAll(userPipelineConfig);

return new PipelineDef(
sourceDef, sinkDef, routeDefs, transformDefs, udfDefs, modelDefs, pipelineConfig);
sourceDefs, sinkDef, routeDefs, transformDefs, udfDefs, modelDefs, pipelineConfig);
}

private void getSourceDefs(JsonNode root, List<SourceDef> sourceDefs) {
SourceDef sourceDef =
toSourceDef(
checkNotNull(
root,
"Missing required field \"%s\" in pipeline definition",
SOURCE_KEY));
sourceDefs.add(sourceDef);
}

private SourceDef toSourceDef(JsonNode sourceNode) {
Expand Down
Loading
Loading