Skip to content

Commit

Permalink
Add requirementsFile parameter to LaunchConfig. (#28732)
Browse files Browse the repository at this point in the history
* Add requirementsFile parameter to LaunchConfig.

* Install requirements in virtualenv for python jobs.
  • Loading branch information
pranavbhandari24 authored Oct 18, 2023
1 parent 4e6f10b commit 5fbf65a
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class LaunchConfig {
private final @Nullable String specPath;
private final @Nullable Sdk sdk;
private final @Nullable String executable;
private final @Nullable String requirementsFile;
private final @Nullable Pipeline pipeline;

private LaunchConfig(Builder builder) {
Expand All @@ -130,6 +131,7 @@ private LaunchConfig(Builder builder) {
this.specPath = builder.specPath;
this.sdk = builder.sdk;
this.executable = builder.executable;
this.requirementsFile = builder.requirementsFile;
this.pipeline = builder.pipeline;
}

Expand Down Expand Up @@ -161,6 +163,10 @@ public ImmutableMap<String, Object> environment() {
return executable;
}

public @Nullable String requirementsFile() {
return requirementsFile;
}

public @Nullable Pipeline pipeline() {
return pipeline;
}
Expand All @@ -185,6 +191,7 @@ public static final class Builder {
private Map<String, String> parameters;
private Sdk sdk;
private String executable;
private String requirementsFile;
private Pipeline pipeline;

private Builder(String jobName, String specPath) {
Expand Down Expand Up @@ -243,6 +250,15 @@ public Builder setExecutable(String executable) {
return this;
}

public @Nullable String getRequirementsFile() {
return requirementsFile;
}

public Builder setRequirementsFile(String requirementsFile) {
this.requirementsFile = requirementsFile;
return this;
}

public @Nullable Pipeline getPipeline() {
return pipeline;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,22 @@ public LaunchInfo launch(String project, String region, LaunchConfig options) th
options.executable() != null,
"Cannot launch a dataflow job "
+ "without executable specified. Please specify executable and try again!");
if (options.requirementsFile() != null) {
// install requirements
cmd.add(
"virtualenv . && source ./bin/activate && pip3 install -r "
+ options.requirementsFile());
cmd.add("&&");
}
LOG.info("Using the executable at {}", options.executable());
cmd.add("python3");
cmd.add(options.executable());
cmd.addAll(extractOptions(project, region, options));
jobId = executeCommandAndParseResponse(cmd);
if (options.requirementsFile() != null) {
cmd.add("&&");
cmd.add("deactivate");
}
jobId = executeCommandAndParseResponse(String.join(" ", cmd));
break;
case GO:
checkState(
Expand All @@ -376,7 +387,7 @@ public LaunchInfo launch(String project, String region, LaunchConfig options) th
cmd.add("run");
cmd.add(options.executable());
cmd.addAll(extractOptions(project, region, options));
jobId = executeCommandAndParseResponse(cmd);
jobId = executeCommandAndParseResponse(String.join(" ", cmd));
break;
default:
throw new RuntimeException(
Expand Down Expand Up @@ -441,10 +452,13 @@ private List<String> extractOptions(String project, String region, LaunchConfig
}

/** Executes the specified command and parses the response to get the Job ID. */
private String executeCommandAndParseResponse(List<String> cmd) throws IOException {
Process process = new ProcessBuilder().command(cmd).redirectErrorStream(true).start();
private String executeCommandAndParseResponse(String cmd) throws IOException {
LOG.info("Running command: {}", cmd);
Process process =
new ProcessBuilder().command("/bin/bash", "-c", cmd).redirectErrorStream(true).start();
String output =
new String(ByteStreams.toByteArray(process.getInputStream()), StandardCharsets.UTF_8);
LOG.info(output);
Matcher m = JOB_ID_PATTERN.matcher(output);
if (!m.find()) {
throw new RuntimeException(
Expand Down

0 comments on commit 5fbf65a

Please sign in to comment.