Skip to content

Commit

Permalink
Skip v1 translation steps for pipelines that will run on v2
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles committed Mar 13, 2024
1 parent bf5551b commit 0a0186d
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,11 @@ public Job translate(List<DataflowPackage> packages) {
environment.setDebugOptions(debugOptions);
}

pipeline.traverseTopologically(this);
if (DataflowRunner.useUnifiedWorker(options)) {
LOG.info("Skipping v1 pipeline translation since this job will run on v2.");
} else {
pipeline.traverseTopologically(this);
}

return job;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1225,9 +1225,14 @@ public DataflowPipelineJob run(Pipeline pipeline) {
DataflowPackage stagedPipeline =
options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME);
dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());
// Now rewrite things to be as needed for v1 (mutates the pipeline)
// This way the job submitted is valid for v1 and v2, simultaneously
replaceV1Transforms(pipeline);

if (useUnifiedWorker(options)) {
LOG.info("Skipping v1 transform replacements since job will run on v2.");
} else {
// Now rewrite things to be as needed for v1 (mutates the pipeline)
// This way the job submitted is valid for v1 and v2, simultaneously
replaceV1Transforms(pipeline);
}
// Capture the SdkComponents for look up during step translations
SdkComponents dataflowV1Components = SdkComponents.create();
dataflowV1Components.registerEnvironment(
Expand Down

0 comments on commit 0a0186d

Please sign in to comment.