Skip to content

Commit

Permalink
[cdc-cli] When using miniCluster, don't check 'FLINK_HOME'.
Browse files Browse the repository at this point in the history
  • Loading branch information
joyCurry30 committed Dec 11, 2023
1 parent 16dce50 commit f0b378e
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 51 deletions.
37 changes: 5 additions & 32 deletions flink-cdc-cli/src/main/java/com/ververica/cdc/cli/CliExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,27 @@

import com.ververica.cdc.cli.parser.PipelineDefinitionParser;
import com.ververica.cdc.cli.parser.YamlPipelineDefinitionParser;
import com.ververica.cdc.cli.utils.FlinkEnvironmentUtils;
import com.ververica.cdc.common.annotation.VisibleForTesting;
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.composer.PipelineComposer;
import com.ververica.cdc.composer.PipelineExecution;
import com.ververica.cdc.composer.definition.PipelineDef;

import java.nio.file.Path;
import java.util.List;

/** Executor for doing the composing and submitting logic for {@link CliFrontend}. */
public class CliExecutor {

private final Path pipelineDefPath;
private final Configuration flinkConfig;
private final Configuration globalPipelineConfig;
private final boolean useMiniCluster;
private final List<Path> additionalJars;

private PipelineComposer composer = null;
private PipelineComposer composer;

public CliExecutor(
Path pipelineDefPath,
Configuration flinkConfig,
Configuration globalPipelineConfig,
boolean useMiniCluster,
List<Path> additionalJars) {
Path pipelineDefPath, Configuration globalPipelineConfig, PipelineComposer composer) {
this.pipelineDefPath = pipelineDefPath;
this.flinkConfig = flinkConfig;
this.globalPipelineConfig = globalPipelineConfig;
this.useMiniCluster = useMiniCluster;
this.additionalJars = additionalJars;
this.composer = composer;
}

public PipelineExecution.ExecutionInfo run() throws Exception {
Expand All @@ -58,41 +47,25 @@ public PipelineExecution.ExecutionInfo run() throws Exception {
PipelineDef pipelineDef =
pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig);

// Create composer
PipelineComposer composer = getComposer(flinkConfig);

// Compose pipeline
PipelineExecution execution = composer.compose(pipelineDef);

// Execute the pipeline
return execution.execute();
}

private PipelineComposer getComposer(Configuration flinkConfig) {
if (composer == null) {
return FlinkEnvironmentUtils.createComposer(
useMiniCluster, flinkConfig, additionalJars);
}
return composer;
}

@VisibleForTesting
void setComposer(PipelineComposer composer) {
this.composer = composer;
}

@VisibleForTesting
public Configuration getFlinkConfig() {
return flinkConfig;
PipelineComposer getComposer() {
return composer;
}

@VisibleForTesting
public Configuration getGlobalPipelineConfig() {
return globalPipelineConfig;
}

@VisibleForTesting
public List<Path> getAdditionalJars() {
return additionalJars;
}
}
22 changes: 16 additions & 6 deletions flink-cdc-cli/src/main/java/com/ververica/cdc/cli/CliFrontend.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import com.ververica.cdc.cli.utils.FlinkEnvironmentUtils;
import com.ververica.cdc.common.annotation.VisibleForTesting;
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.composer.PipelineComposer;
import com.ververica.cdc.composer.PipelineExecution;
import com.ververica.cdc.composer.flink.FlinkPipelineComposer;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
Expand Down Expand Up @@ -84,6 +86,18 @@ static CliExecutor createExecutor(CommandLine commandLine) throws Exception {
// Global pipeline configuration
Configuration globalPipelineConfig = getGlobalConfig(commandLine);

// Create pipeline composer
boolean useMiniCluster = commandLine.hasOption(CliFrontendOptions.USE_MINI_CLUSTER);
PipelineComposer composer =
useMiniCluster
? FlinkPipelineComposer.ofMiniCluster()
: createRemoteComposer(commandLine);

// Build executor
return new CliExecutor(pipelineDefPath, globalPipelineConfig, composer);
}

private static PipelineComposer createRemoteComposer(CommandLine commandLine) throws Exception {
// Load Flink environment
Path flinkHome = getFlinkHome(commandLine);
Configuration flinkConfig = FlinkEnvironmentUtils.loadFlinkConfiguration(flinkHome);
Expand All @@ -97,12 +111,8 @@ static CliExecutor createExecutor(CommandLine commandLine) throws Exception {
.map(Paths::get)
.collect(Collectors.toList());

// Build executor
return new CliExecutor(
pipelineDefPath,
flinkConfig,
globalPipelineConfig,
commandLine.hasOption(CliFrontendOptions.USE_MINI_CLUSTER),
return FlinkPipelineComposer.ofRemoteCluster(
org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()),
additionalJars);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
package com.ververica.cdc.cli.utils;

import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.composer.flink.FlinkPipelineComposer;

import java.nio.file.Path;
import java.util.List;

/** Utilities for handling Flink configuration and environment. */
public class FlinkEnvironmentUtils {
Expand All @@ -32,14 +30,4 @@ public static Configuration loadFlinkConfiguration(Path flinkHome) throws Except
Path flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME);
return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath);
}

public static FlinkPipelineComposer createComposer(
boolean useMiniCluster, Configuration flinkConfig, List<Path> additionalJars) {
if (useMiniCluster) {
return FlinkPipelineComposer.ofMiniCluster();
}
return FlinkPipelineComposer.ofRemoteCluster(
org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()),
additionalJars);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.ververica.cdc.cli;

import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.shaded.guava31.com.google.common.io.Resources;

import com.ververica.cdc.composer.PipelineComposer;
Expand All @@ -28,9 +31,12 @@
import org.junit.jupiter.api.Test;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.PrintStream;
import java.lang.reflect.Field;
import java.net.URL;
import java.nio.file.Paths;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -90,7 +96,14 @@ void testAdditionalJar() throws Exception {
CliExecutor executor =
createExecutor(
pipelineDef(), "--flink-home", flinkHome(), "--jar", aJar, "--jar", bJar);
assertThat(executor.getAdditionalJars()).contains(Paths.get(aJar), Paths.get(bJar));
PipelineComposer composer = executor.getComposer();
Field field = composer.getClass().getDeclaredField("env");
field.setAccessible(true);
StreamExecutionEnvironment env = (StreamExecutionEnvironment) field.get(composer);
List<String> jars = env.getConfiguration().get(PipelineOptions.JARS);
String aJarPath = new File(aJar).toURI().toString();
String bJarPath = new File(bJar).toURI().toString();
assertThat(jars).contains(aJarPath, bJarPath);
}

@Test
Expand Down

0 comments on commit f0b378e

Please sign in to comment.