Skip to content

Commit

Permalink
[cdc-cli][cdc-composer] Add 'flink-config' for pipeline yaml, which w…
Browse files Browse the repository at this point in the history
…ill overwrite the config from 'flink-conf.yaml'.
  • Loading branch information
joyCurry30 committed Dec 11, 2023
1 parent f0b378e commit a1e60e4
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
private static final String SINK_KEY = "sink";
private static final String ROUTE_KEY = "route";
private static final String PIPELINE_KEY = "pipeline";
private static final String FLINK_CONFIG_KEY = "flink-config";

// Source / sink keys
private static final String TYPE_KEY = "type";
Expand Down Expand Up @@ -85,12 +86,15 @@ public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfi
// Pipeline configs are optional
Configuration userPipelineConfig = toPipelineConfig(root.get(PIPELINE_KEY));

// Flink configs are optional
Configuration flinkConfig = toFlinkConfig(root.get(FLINK_CONFIG_KEY));

// Merge user config into global config
Configuration pipelineConfig = new Configuration();
pipelineConfig.addAll(globalPipelineConfig);
pipelineConfig.addAll(userPipelineConfig);

return new PipelineDef(sourceDef, sinkDef, routeDefs, null, pipelineConfig);
return new PipelineDef(sourceDef, sinkDef, routeDefs, null, pipelineConfig, flinkConfig);
}

private SourceDef toSourceDef(JsonNode sourceNode) {
Expand Down Expand Up @@ -147,6 +151,15 @@ private RouteDef toRouteDef(JsonNode routeNode) {
return new RouteDef(sourceTable, sinkTable, description);
}

private Configuration toFlinkConfig(JsonNode flinkConfigNode) {
if (flinkConfigNode == null || flinkConfigNode.isNull()) {
return new Configuration();
}
Map<String, String> flinkConfigMap =
mapper.convertValue(flinkConfigNode, new TypeReference<Map<String, String>>() {});
return Configuration.fromMap(flinkConfigMap);
}

private Configuration toPipelineConfig(JsonNode pipelineConfigNode) {
if (pipelineConfigNode == null || pipelineConfigNode.isNull()) {
return new Configuration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ void testValidTimeZone() throws Exception {
}

@Test
void testInvalidTimeZone() throws Exception {
void testInvalidTimeZone() {
URL resource = Resources.getResource("definitions/pipeline-definition-minimized.yaml");
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
assertThatThrownBy(
Expand Down Expand Up @@ -183,6 +183,11 @@ void testInvalidTimeZone() throws Exception {
.put("name", "source-database-sync-pipe")
.put("parallelism", "4")
.put("enable-schema-evolution", "false")
.build()),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("execution.checkpointing.interval", "10000")
.put("execution.checkpointing.mode", "EXACTLY_ONCE")
.build()));

private final PipelineDef fullDefWithGlobalConf =
Expand Down Expand Up @@ -228,6 +233,11 @@ void testInvalidTimeZone() throws Exception {
.put("parallelism", "4")
.put("enable-schema-evolution", "false")
.put("foo", "bar")
.build()),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("execution.checkpointing.interval", "10000")
.put("execution.checkpointing.mode", "EXACTLY_ONCE")
.build()));

private final PipelineDef defWithOptional =
Expand Down Expand Up @@ -257,15 +267,15 @@ void testInvalidTimeZone() throws Exception {
"mydb.default.app_order_.*", "odsdb.default.app_order", null)),
null,
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("parallelism", "4")
.build()));
ImmutableMap.<String, String>builder().put("parallelism", "4").build()),
new Configuration());

private final PipelineDef minimizedDef =
new PipelineDef(
new SourceDef("mysql", null, new Configuration()),
new SinkDef("kafka", null, new Configuration()),
Collections.emptyList(),
null,
new Configuration(),
new Configuration());
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ pipeline:
name: source-database-sync-pipe
parallelism: 4
enable-schema-evolution: false

flink-config:
execution.checkpointing.interval: 10000
execution.checkpointing.mode: EXACTLY_ONCE
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,21 @@ public class PipelineDef {
private final List<RouteDef> routes;
private final List<TransformDef> transforms;
private final Configuration config;
private final Configuration flinkConfig;

public PipelineDef(
SourceDef source,
SinkDef sink,
List<RouteDef> routes,
List<TransformDef> transforms,
Configuration config) {
Configuration config,
Configuration flinkConfig) {
this.source = source;
this.sink = sink;
this.routes = routes;
this.transforms = transforms;
this.config = evaluatePipelineTimeZone(config);
this.flinkConfig = flinkConfig;
}

public SourceDef getSource() {
Expand All @@ -87,6 +90,10 @@ public Configuration getConfig() {
return config;
}

public Configuration getFlinkConfig() {
return flinkConfig;
}

@Override
public String toString() {
return "PipelineDef{"
Expand All @@ -100,6 +107,8 @@ public String toString() {
+ transforms
+ ", config="
+ config
+ ", flinkConfig="
+ flinkConfig
+ '}';
}

Expand All @@ -116,12 +125,13 @@ public boolean equals(Object o) {
&& Objects.equals(sink, that.sink)
&& Objects.equals(routes, that.routes)
&& Objects.equals(transforms, that.transforms)
&& Objects.equals(config, that.config);
&& Objects.equals(config, that.config)
&& Objects.equals(flinkConfig, that.flinkConfig);
}

@Override
public int hashCode() {
return Objects.hash(source, sink, routes, transforms, config);
return Objects.hash(source, sink, routes, transforms, config, flinkConfig);
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.ververica.cdc.composer.flink;

import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

Expand Down Expand Up @@ -95,6 +96,12 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM);
env.getConfig().setParallelism(parallelism);

// Flink config
Configuration flinkConfig = pipelineDef.getFlinkConfig();
ReadableConfig flinkConfigOptions =
org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap());
env.configure(flinkConfigOptions, Thread.currentThread().getContextClassLoader());

// Source
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
DataStream<Event> stream =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ void testSingleSplitSingleTable() throws Exception {
sinkDef,
Collections.emptyList(),
Collections.emptyList(),
pipelineConfig);
pipelineConfig,
new Configuration());

// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
Expand Down Expand Up @@ -170,7 +171,8 @@ void testSingleSplitMultipleTables() throws Exception {
sinkDef,
Collections.emptyList(),
Collections.emptyList(),
pipelineConfig);
pipelineConfig,
new Configuration());

// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
Expand Down Expand Up @@ -234,7 +236,8 @@ void testMultiSplitsSingleTable() throws Exception {
sinkDef,
Collections.emptyList(),
Collections.emptyList(),
pipelineConfig);
pipelineConfig,
new Configuration());

// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
Expand Down

0 comments on commit a1e60e4

Please sign in to comment.