diff --git a/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/CliExecutor.java b/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/CliExecutor.java index 6de2513fc49..529e5a9fdb8 100644 --- a/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/CliExecutor.java +++ b/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/CliExecutor.java @@ -18,7 +18,6 @@ import com.ververica.cdc.cli.parser.PipelineDefinitionParser; import com.ververica.cdc.cli.parser.YamlPipelineDefinitionParser; -import com.ververica.cdc.cli.utils.FlinkEnvironmentUtils; import com.ververica.cdc.common.annotation.VisibleForTesting; import com.ververica.cdc.common.configuration.Configuration; import com.ververica.cdc.composer.PipelineComposer; @@ -26,30 +25,20 @@ import com.ververica.cdc.composer.definition.PipelineDef; import java.nio.file.Path; -import java.util.List; /** Executor for doing the composing and submitting logic for {@link CliFrontend}. */ public class CliExecutor { private final Path pipelineDefPath; - private final Configuration flinkConfig; private final Configuration globalPipelineConfig; - private final boolean useMiniCluster; - private final List additionalJars; - private PipelineComposer composer = null; + private PipelineComposer composer; public CliExecutor( - Path pipelineDefPath, - Configuration flinkConfig, - Configuration globalPipelineConfig, - boolean useMiniCluster, - List additionalJars) { + Path pipelineDefPath, Configuration globalPipelineConfig, PipelineComposer composer) { this.pipelineDefPath = pipelineDefPath; - this.flinkConfig = flinkConfig; this.globalPipelineConfig = globalPipelineConfig; - this.useMiniCluster = useMiniCluster; - this.additionalJars = additionalJars; + this.composer = composer; } public PipelineExecution.ExecutionInfo run() throws Exception { @@ -58,9 +47,6 @@ public PipelineExecution.ExecutionInfo run() throws Exception { PipelineDef pipelineDef = pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig); - // Create composer - PipelineComposer composer = getComposer(flinkConfig); - // Compose pipeline PipelineExecution execution = composer.compose(pipelineDef); @@ -68,31 +54,18 @@ public PipelineExecution.ExecutionInfo run() throws Exception { return execution.execute(); } - private PipelineComposer getComposer(Configuration flinkConfig) { - if (composer == null) { - return FlinkEnvironmentUtils.createComposer( - useMiniCluster, flinkConfig, additionalJars); - } - return composer; - } - @VisibleForTesting void setComposer(PipelineComposer composer) { this.composer = composer; } @VisibleForTesting - public Configuration getFlinkConfig() { - return flinkConfig; + PipelineComposer getComposer() { + return composer; } @VisibleForTesting public Configuration getGlobalPipelineConfig() { return globalPipelineConfig; } - - @VisibleForTesting - public List getAdditionalJars() { - return additionalJars; - } } diff --git a/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/CliFrontend.java b/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/CliFrontend.java index 57556dcc9e4..4f45d20b2a8 100644 --- a/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/CliFrontend.java +++ b/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/CliFrontend.java @@ -20,7 +20,9 @@ import com.ververica.cdc.cli.utils.FlinkEnvironmentUtils; import com.ververica.cdc.common.annotation.VisibleForTesting; import com.ververica.cdc.common.configuration.Configuration; +import com.ververica.cdc.composer.PipelineComposer; import com.ververica.cdc.composer.PipelineExecution; +import com.ververica.cdc.composer.flink.FlinkPipelineComposer; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; @@ -84,6 +86,18 @@ static CliExecutor createExecutor(CommandLine commandLine) throws Exception { // Global pipeline configuration Configuration globalPipelineConfig = getGlobalConfig(commandLine); + // Create pipeline composer + boolean useMiniCluster = commandLine.hasOption(CliFrontendOptions.USE_MINI_CLUSTER); + PipelineComposer composer = + useMiniCluster + ? FlinkPipelineComposer.ofMiniCluster() + : createRemoteComposer(commandLine); + + // Build executor + return new CliExecutor(pipelineDefPath, globalPipelineConfig, composer); + } + + private static PipelineComposer createRemoteComposer(CommandLine commandLine) throws Exception { // Load Flink environment Path flinkHome = getFlinkHome(commandLine); Configuration flinkConfig = FlinkEnvironmentUtils.loadFlinkConfiguration(flinkHome); @@ -97,12 +111,8 @@ static CliExecutor createExecutor(CommandLine commandLine) throws Exception { .map(Paths::get) .collect(Collectors.toList()); - // Build executor - return new CliExecutor( - pipelineDefPath, - flinkConfig, - globalPipelineConfig, - commandLine.hasOption(CliFrontendOptions.USE_MINI_CLUSTER), + return FlinkPipelineComposer.ofRemoteCluster( + org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()), additionalJars); } diff --git a/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/utils/FlinkEnvironmentUtils.java b/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/utils/FlinkEnvironmentUtils.java index c868c3fd61e..8ea086dd847 100644 --- a/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/utils/FlinkEnvironmentUtils.java +++ b/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/utils/FlinkEnvironmentUtils.java @@ -17,10 +17,8 @@ package com.ververica.cdc.cli.utils; import com.ververica.cdc.common.configuration.Configuration; -import com.ververica.cdc.composer.flink.FlinkPipelineComposer; import java.nio.file.Path; -import java.util.List; /** Utilities for handling Flink configuration and environment. */ public class FlinkEnvironmentUtils { @@ -32,14 +30,4 @@ public static Configuration loadFlinkConfiguration(Path flinkHome) throws Except Path flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME); return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath); } - - public static FlinkPipelineComposer createComposer( - boolean useMiniCluster, Configuration flinkConfig, List additionalJars) { - if (useMiniCluster) { - return FlinkPipelineComposer.ofMiniCluster(); - } - return FlinkPipelineComposer.ofRemoteCluster( - org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()), - additionalJars); - } } diff --git a/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/CliFrontendTest.java b/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/CliFrontendTest.java index f008ff68482..59155e7bdf1 100644 --- a/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/CliFrontendTest.java +++ b/flink-cdc-cli/src/test/java/com/ververica/cdc/cli/CliFrontendTest.java @@ -16,6 +16,9 @@ package com.ververica.cdc.cli; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + import org.apache.flink.shaded.guava31.com.google.common.io.Resources; import com.ververica.cdc.composer.PipelineComposer; @@ -28,9 +31,12 @@ import org.junit.jupiter.api.Test; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.PrintStream; +import java.lang.reflect.Field; import java.net.URL; import java.nio.file.Paths; +import java.util.List; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -90,7 +96,14 @@ void testAdditionalJar() throws Exception { CliExecutor executor = createExecutor( pipelineDef(), "--flink-home", flinkHome(), "--jar", aJar, "--jar", bJar); - assertThat(executor.getAdditionalJars()).contains(Paths.get(aJar), Paths.get(bJar)); + PipelineComposer composer = executor.getComposer(); + Field field = composer.getClass().getDeclaredField("env"); + field.setAccessible(true); + StreamExecutionEnvironment env = (StreamExecutionEnvironment) field.get(composer); + List jars = env.getConfiguration().get(PipelineOptions.JARS); + String aJarPath = new File(aJar).toURI().toString(); + String bJarPath = new File(bJar).toURI().toString(); + assertThat(jars).contains(aJarPath, bJarPath); } @Test