diff --git a/it/common/src/main/java/org/apache/beam/it/common/utils/PipelineUtils.java b/it/common/src/main/java/org/apache/beam/it/common/utils/PipelineUtils.java index d249d43d3789..e53de5bdb678 100644 --- a/it/common/src/main/java/org/apache/beam/it/common/utils/PipelineUtils.java +++ b/it/common/src/main/java/org/apache/beam/it/common/utils/PipelineUtils.java @@ -22,6 +22,8 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.function.Supplier; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.options.PipelineOptions; @@ -115,7 +117,13 @@ public static String createJobName(String prefix, int randomChars) { /** Get raw job name (without prefix) from a jobName generated by createJobName. */ public static String extractJobName(String nameWithPrefix) { - return nameWithPrefix.substring(0, nameWithPrefix.lastIndexOf("-")); + Pattern pattern = Pattern.compile("-\\d{17}"); // yyyyMMddHHmmssSSS + Matcher matcher = pattern.matcher(nameWithPrefix); + if (matcher.find()) { + return nameWithPrefix.substring(0, matcher.start()); + } else { + throw new IllegalArgumentException("Unexpected job name: " + nameWithPrefix); + } } /* diff --git a/it/common/src/test/java/org/apache/beam/it/common/utils/PipelineUtilsTest.java b/it/common/src/test/java/org/apache/beam/it/common/utils/PipelineUtilsTest.java index 316283cdf7d2..d2b8334964f5 100644 --- a/it/common/src/test/java/org/apache/beam/it/common/utils/PipelineUtilsTest.java +++ b/it/common/src/test/java/org/apache/beam/it/common/utils/PipelineUtilsTest.java @@ -51,4 +51,10 @@ public void testCreateExtractJobName() { String name = "create-job-name"; assertEquals(name, extractJobName(createJobName(name))); } + + @Test + public void testCreateExtractJobNameWithRandomChars() { + String name = "create-job-name"; + assertEquals(name, extractJobName(createJobName(name, 8))); + } } diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java index 620d24d4e117..e0f9f6c2f63d 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java @@ -410,9 +410,9 @@ protected LaunchInfo.Builder getJobInfoBuilder(LaunchConfig options, JobState st // config pipelineName String pipelineName = PipelineUtils.extractJobName(options.jobName()); String overrideName = null; - if (pipelineName.endsWith("write")) { + if (pipelineName.startsWith("write")) { overrideName = System.getProperty(WRITE_PIPELINE_NAME_OVERWRITE); - } else if (pipelineName.endsWith("read")) { + } else if (pipelineName.startsWith("read")) { overrideName = System.getProperty(READ_PIPELINE_NAME_OVERWRITE); } if (!Strings.isNullOrEmpty(overrideName)) {