Skip to content

Commit

Permalink
Allow override beam version for PythonExternalTransform via pipeline …
Browse files Browse the repository at this point in the history
…option (apache#31691)
  • Loading branch information
Abacn authored and acrites committed Jul 17, 2024
1 parent 57cd847 commit d242957
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -486,12 +486,10 @@ public OutputT expand(InputT input) {
OutputT output = null;
int port = PythonService.findAvailablePort();
PipelineOptionsFactory.register(PythonExternalTransformOptions.class);
boolean useTransformService =
input
.getPipeline()
.getOptions()
.as(PythonExternalTransformOptions.class)
.getUseTransformService();
PythonExternalTransformOptions options =
input.getPipeline().getOptions().as(PythonExternalTransformOptions.class);
boolean useTransformService = options.getUseTransformService();
@Nullable String customBeamRequirement = options.getCustomBeamRequirement();
boolean pythonAvailable = isPythonAvailable();
boolean dockerAvailable = isDockerAvailable();

Expand Down Expand Up @@ -557,6 +555,9 @@ public OutputT expand(InputT input) {
new PythonService(
"apache_beam.runners.portability.expansion_service_main", args.build())
.withExtraPackages(extraPackages);
if (!Strings.isNullOrEmpty(customBeamRequirement)) {
service = service.withCustomBeamRequirement(customBeamRequirement);
}
try (AutoCloseable p = service.start()) {
// allow more time waiting for the port ready for transient expansion service setup.
PythonService.waitForPort("localhost", port, 60000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,14 @@ public interface PythonExternalTransformOptions extends PipelineOptions {
boolean getUseTransformService();

void setUseTransformService(boolean useTransformService);

@Description("Custom Beam version for bootstrap Beam venv.")
String getCustomBeamRequirement();

/**
* Set custom Beam version for bootstrap Beam venv.
*
* <p>For example: 2.50.0rc1, "/path/to/apache-beam.whl"
*/
void setCustomBeamRequirement(String customBeamRequirement);
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@ public class PythonService {
private static final Logger LOG = LoggerFactory.getLogger(PythonService.class);

private final String module;
private String beamRequirement;
private final List<String> args;
private final List<String> extraPackages;

public PythonService(String module, List<String> args, List<String> extraPackages) {
this.module = module;
this.args = args;
this.extraPackages = extraPackages;
this.beamRequirement =
getMatchingStablePythonSDKVersion(ReleaseInfo.getReleaseInfo().getSdkVersion());
}

public PythonService(String module, List<String> args) {
Expand All @@ -72,6 +75,18 @@ public PythonService withExtraPackages(List<String> extraPackages) {
ImmutableList.<String>builder().addAll(this.extraPackages).addAll(extraPackages).build());
}

/**
* Override the Beam version to be installed in the service environment.
*
* @param customBeamRequirement the custom Beam requirement, can be a version (e.g. 2.57.0) or a
* path (e.g. /path/to/apache-beam.whl).
* @return this instance where Beam version overriden in place.
*/
public PythonService withCustomBeamRequirement(String customBeamRequirement) {
this.beamRequirement = customBeamRequirement;
return this;
}

@SuppressWarnings("argument")
public AutoCloseable start() throws IOException, InterruptedException {
File bootstrapScript = File.createTempFile("bootstrap_beam_venv", ".py");
Expand All @@ -82,9 +97,7 @@ public AutoCloseable start() throws IOException, InterruptedException {
List<String> bootstrapCommand = new ArrayList<>();
bootstrapCommand.add(whichPython());
bootstrapCommand.add(bootstrapScript.getAbsolutePath());
bootstrapCommand.add(
"--beam_version="
+ getMatchingStablePythonSDKVersion(ReleaseInfo.getReleaseInfo().getSdkVersion()));
bootstrapCommand.add("--beam_version=" + beamRequirement);
if (!extraPackages.isEmpty()) {
bootstrapCommand.add("--extra_packages=" + String.join(";", extraPackages));
}
Expand Down

0 comments on commit d242957

Please sign in to comment.