diff --git a/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParser.java index ecf60f9cbe7..54184a65954 100644 --- a/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParser.java @@ -43,6 +43,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { private static final String SINK_KEY = "sink"; private static final String ROUTE_KEY = "route"; private static final String PIPELINE_KEY = "pipeline"; + private static final String FLINK_CONFIG_KEY = "flink-config"; // Source / sink keys private static final String TYPE_KEY = "type"; @@ -85,12 +86,15 @@ public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfi // Pipeline configs are optional Configuration userPipelineConfig = toPipelineConfig(root.get(PIPELINE_KEY)); + // Flink configs are optional + Configuration flinkConfig = toFlinkConfig(root.get(FLINK_CONFIG_KEY)); + // Merge user config into global config Configuration pipelineConfig = new Configuration(); pipelineConfig.addAll(globalPipelineConfig); pipelineConfig.addAll(userPipelineConfig); - return new PipelineDef(sourceDef, sinkDef, routeDefs, null, pipelineConfig); + return new PipelineDef(sourceDef, sinkDef, routeDefs, null, pipelineConfig, flinkConfig); } private SourceDef toSourceDef(JsonNode sourceNode) { @@ -147,6 +151,15 @@ private RouteDef toRouteDef(JsonNode routeNode) { return new RouteDef(sourceTable, sinkTable, description); } + private Configuration toFlinkConfig(JsonNode flinkConfigNode) { + if (flinkConfigNode == null || flinkConfigNode.isNull()) { + return new Configuration(); + } + Map flinkConfigMap = + mapper.convertValue(flinkConfigNode, new TypeReference>() {}); + return Configuration.fromMap(flinkConfigMap); + } + private Configuration toPipelineConfig(JsonNode pipelineConfigNode) { if (pipelineConfigNode == null || pipelineConfigNode.isNull()) { return new Configuration(); diff --git a/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index 045b1fe7a7e..21fdb64ed37 100644 --- a/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -120,7 +120,7 @@ void testValidTimeZone() throws Exception { } @Test - void testInvalidTimeZone() throws Exception { + void testInvalidTimeZone() { URL resource = Resources.getResource("definitions/pipeline-definition-minimized.yaml"); YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); assertThatThrownBy( @@ -183,6 +183,11 @@ void testInvalidTimeZone() throws Exception { .put("name", "source-database-sync-pipe") .put("parallelism", "4") .put("enable-schema-evolution", "false") + .build()), + Configuration.fromMap( + ImmutableMap.builder() + .put("execution.checkpointing.interval", "10000") + .put("execution.checkpointing.mode", "EXACTLY_ONCE") .build())); private final PipelineDef fullDefWithGlobalConf = @@ -228,6 +233,11 @@ void testInvalidTimeZone() throws Exception { .put("parallelism", "4") .put("enable-schema-evolution", "false") .put("foo", "bar") + .build()), + Configuration.fromMap( + ImmutableMap.builder() + .put("execution.checkpointing.interval", "10000") + .put("execution.checkpointing.mode", "EXACTLY_ONCE") .build())); private final PipelineDef defWithOptional = @@ -257,9 +267,8 @@ void testInvalidTimeZone() throws Exception { "mydb.default.app_order_.*", "odsdb.default.app_order", null)), null, Configuration.fromMap( - ImmutableMap.builder() - .put("parallelism", "4") - .build())); + ImmutableMap.builder().put("parallelism", "4").build()), + new Configuration()); private final PipelineDef minimizedDef = new PipelineDef( @@ -267,5 +276,6 @@ void testInvalidTimeZone() throws Exception { new SinkDef("kafka", null, new Configuration()), Collections.emptyList(), null, + new Configuration(), new Configuration()); } diff --git a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml index a719b45fff5..2ae54a49c96 100644 --- a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml +++ b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml @@ -51,3 +51,7 @@ pipeline: name: source-database-sync-pipe parallelism: 4 enable-schema-evolution: false + +flink-config: + execution.checkpointing.interval: 10000 + execution.checkpointing.mode: EXACTLY_ONCE diff --git a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/PipelineDef.java b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/PipelineDef.java index 6c65ae05d1e..5f67cd9f5cb 100644 --- a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/PipelineDef.java +++ b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/definition/PipelineDef.java @@ -53,18 +53,21 @@ public class PipelineDef { private final List routes; private final List transforms; private final Configuration config; + private final Configuration flinkConfig; public PipelineDef( SourceDef source, SinkDef sink, List routes, List transforms, - Configuration config) { + Configuration config, + Configuration flinkConfig) { this.source = source; this.sink = sink; this.routes = routes; this.transforms = transforms; this.config = evaluatePipelineTimeZone(config); + this.flinkConfig = flinkConfig; } public SourceDef getSource() { @@ -87,6 +90,10 @@ public Configuration getConfig() { return config; } + public Configuration getFlinkConfig() { + return flinkConfig; + } + @Override public String toString() { return "PipelineDef{" @@ -100,6 +107,8 @@ public String toString() { + transforms + ", config=" + config + + ", flinkConfig=" + + flinkConfig + '}'; } @@ -116,12 +125,13 @@ public boolean equals(Object o) { && Objects.equals(sink, that.sink) && Objects.equals(routes, that.routes) && Objects.equals(transforms, that.transforms) - && Objects.equals(config, that.config); + && Objects.equals(config, that.config) + && Objects.equals(flinkConfig, that.flinkConfig); } @Override public int hashCode() { - return Objects.hash(source, sink, routes, transforms, config); + return Objects.hash(source, sink, routes, transforms, config, flinkConfig); } // ------------------------------------------------------------------------ diff --git a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java index 0e27055513c..d9480fc7148 100644 --- a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java @@ -17,6 +17,7 @@ package com.ververica.cdc.composer.flink; import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -95,6 +96,12 @@ public PipelineExecution compose(PipelineDef pipelineDef) { int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM); env.getConfig().setParallelism(parallelism); + // Flink config + Configuration flinkConfig = pipelineDef.getFlinkConfig(); + ReadableConfig flinkConfigOptions = + org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()); + env.configure(flinkConfigOptions, Thread.currentThread().getContextClassLoader()); + // Source DataSourceTranslator sourceTranslator = new DataSourceTranslator(); DataStream stream = diff --git a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerITCase.java index cb0758923b3..31bcb7f66ac 100644 --- a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -116,7 +116,8 @@ void testSingleSplitSingleTable() throws Exception { sinkDef, Collections.emptyList(), Collections.emptyList(), - pipelineConfig); + pipelineConfig, + new Configuration()); // Execute the pipeline PipelineExecution execution = composer.compose(pipelineDef); @@ -170,7 +171,8 @@ void testSingleSplitMultipleTables() throws Exception { sinkDef, Collections.emptyList(), Collections.emptyList(), - pipelineConfig); + pipelineConfig, + new Configuration()); // Execute the pipeline PipelineExecution execution = composer.compose(pipelineDef); @@ -234,7 +236,8 @@ void testMultiSplitsSingleTable() throws Exception { sinkDef, Collections.emptyList(), Collections.emptyList(), - pipelineConfig); + pipelineConfig, + new Configuration()); // Execute the pipeline PipelineExecution execution = composer.compose(pipelineDef);