-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36974]support overwrite flink config by command line #3823
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @hiliuxg's nice work! Just left some trivial comments.
assertThat(configMap.get("execution.target")).isEqualTo("yarn-session"); | ||
assertThat(configMap.get("rest.bind-port")).isEqualTo("42689"); | ||
assertThat(configMap.get("yarn.application.id")) | ||
.isEqualTo("application_1714009558476_3563"); | ||
assertThat(configMap.get("rest.bind-address")).isEqualTo("10.1.140.140"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertThat(configMap.get("execution.target")).isEqualTo("yarn-session"); | |
assertThat(configMap.get("rest.bind-port")).isEqualTo("42689"); | |
assertThat(configMap.get("yarn.application.id")) | |
.isEqualTo("application_1714009558476_3563"); | |
assertThat(configMap.get("rest.bind-address")).isEqualTo("10.1.140.140"); | |
assertThat(configMap) | |
.containsEntry("execution.target", "yarn-session") | |
.containsEntry("rest.bind-port", "42689") | |
.containsEntry("yarn.application.id", "application_1714009558476_3563") | |
.containsEntry("rest.bind-address", "10.1.140.140"); |
public static final Option FLINK_CONFIG = | ||
Option.builder("fc") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQL Client provides -D
to pass extra Flink options dynamically. Maybe we can follow the same naming here?
Configuration flinkConfig, CommandLine commandLine) { | ||
String[] flinkConfigs = commandLine.getOptionValues(FLINK_CONFIG); | ||
if (flinkConfigs != null) { | ||
LOG.info("Find flink config items: {}", String.join(",", flinkConfigs)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOG.info("Find flink config items: {}", String.join(",", flinkConfigs)); | |
LOG.info("Dynamic flink config items found: {}", flinkConfigs); |
String key = config.split("=")[0].trim(); | ||
String value = config.split("=")[1].trim(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can split just once and verify the format first.
@@ -145,6 +146,31 @@ void testPipelineExecuting() throws Exception { | |||
assertThat(executionInfo.getDescription()).isEqualTo("fake-description"); | |||
} | |||
|
|||
@Test | |||
void testPipelineExecutingWithFlinkConfig() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also add tests for malformed arguments (like when =
is missing, or =
is part of value, etc.)
Hi @yuxiqian, thanks for the review. I have accepted your suggestions and made the necessary modifications. Please review again when you have time. thanks . |
flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java
Outdated
Show resolved
Hide resolved
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java
Outdated
Show resolved
Hide resolved
hi @yuxiqian , thank you for your guidance. I have made another version of the code. Please review it again when you have time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks nice! Thanks for @hiliuxg's quick response.
Could @leonardBang @ruanhang1993 trigger the CI please?
String value = properties.getProperty(key); | ||
if (StringUtils.isNullOrWhitespaceOnly(key) | ||
|| StringUtils.isNullOrWhitespaceOnly(value)) { | ||
throw new IllegalArgumentException("Illegal argument for key or value"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe we can also print out the questionable key / value pair in command line to improve exception traceability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. thanks for comment
@hiliuxg Seems there are some test cases in |
@hiliuxg Also, could you please rebase this branch to latest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the HELP_MESSAGE
in CliFrontendTest
should also be updated to include the -D parameter.
@@ -145,6 +147,70 @@ void testPipelineExecuting() throws Exception { | |||
assertThat(executionInfo.getDescription()).isEqualTo("fake-description"); | |||
} | |||
|
|||
@Test | |||
void testPipelineExecutingWithFlinkConfig() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we need to add some more cases that test for the concatenation of “-D” and key=value pairs instead of space-separated pairs, such as "-Dexecution.target= yarn-session". I'm concerned that there are cases that might not be recognized, such as "-D=execution.target".
Support overwrite flink config in the command line, for example:
bin/flink-cdc.sh1732864461789.yaml --flink-conf execution.checkpointing.interval=10min --flink-conf rest.bind-port=42689 --flink-conf yarn.application.id=application_1714009558476_3563 --flink-conf execution.target=yarn-session --flink-conf rest.bind-address=10.5.140.140
The example provided is used to submit a job to a specified host's YARN session cluster with specific Flink configurations.