diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index da3b3e7b6163..6f2953796fc7 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -428,7 +428,11 @@ public Job translate(List packages) { environment.setDebugOptions(debugOptions); } - pipeline.traverseTopologically(this); + if (DataflowRunner.useUnifiedWorker(options)) { + LOG.info("Skipping v1 pipeline translation since this job will run on v2."); + } else { + pipeline.traverseTopologically(this); + } return job; } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index d7f9ba61442e..fc73429ee150 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1225,9 +1225,14 @@ public DataflowPipelineJob run(Pipeline pipeline) { DataflowPackage stagedPipeline = options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME); dataflowOptions.setPipelineUrl(stagedPipeline.getLocation()); - // Now rewrite things to be as needed for v1 (mutates the pipeline) - // This way the job submitted is valid for v1 and v2, simultaneously - replaceV1Transforms(pipeline); + + if (useUnifiedWorker(options)) { + LOG.info("Skipping v1 transform replacements since job will run on v2."); + } else { + // Now rewrite things to be as needed for v1 (mutates the pipeline) + // This way the job submitted is valid for v1 and v2, simultaneously + replaceV1Transforms(pipeline); + } // Capture the SdkComponents for look up during step translations SdkComponents dataflowV1Components = SdkComponents.create(); dataflowV1Components.registerEnvironment(