Skip to content

Commit

Permalink
Updates the Transform Service to accept Python extra packages through…
Browse files Browse the repository at this point in the history
… the Java API
  • Loading branch information
chamikaramj committed Oct 3, 2023
1 parent 5e38dec commit 479b15f
Show file tree
Hide file tree
Showing 11 changed files with 555 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public RunnerApi.Pipeline upgradeTransformsViaTransformService(
} else if (options.getTransformServiceBeamVersion() != null) {
String projectName = UUID.randomUUID().toString();
int port = findAvailablePort();
service = TransformServiceLauncher.forProject(projectName, port);
service = TransformServiceLauncher.forProject(projectName, port, null);
service.setBeamVersion(options.getTransformServiceBeamVersion());

// Starting the transform service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,20 @@ public OutputT expand(InputT input) {
boolean pythonAvailable = isPythonAvailable();
boolean dockerAvailable = isDockerAvailable();

File requirementsFile = null;
if (!extraPackages.isEmpty()) {
requirementsFile = File.createTempFile("requirements", ".txt");
requirementsFile.deleteOnExit();
try (Writer fout =
new OutputStreamWriter(
new FileOutputStream(requirementsFile.getAbsolutePath()), Charsets.UTF_8)) {
for (String pkg : extraPackages) {
fout.write(pkg);
fout.write('\n');
}
}
}

// We use the transform service if either of the following is true.
// * It was explicitly requested.
// * Python executable is not available in the system but Docker is available.
Expand All @@ -514,19 +528,16 @@ public OutputT expand(InputT input) {
projectName,
port);

TransformServiceLauncher service = TransformServiceLauncher.forProject(projectName, port);
String pythonRequirementsFile =
requirementsFile != null ? requirementsFile.getAbsolutePath() : null;
TransformServiceLauncher service =
TransformServiceLauncher.forProject(projectName, port, pythonRequirementsFile);
service.setBeamVersion(ReleaseInfo.getReleaseInfo().getSdkVersion());
// TODO(https://github.com/apache/beam/issues/26833): add support for installing extra
// packages.
if (!extraPackages.isEmpty()) {
throw new RuntimeException(
"Transform Service does not support installing extra packages yet");
}
try {
// Starting the transform service.
service.start();
// Waiting the service to be ready.
service.waitTillUp(15000);
service.waitTillUp(-1);
// Expanding the transform.
output = apply(input, String.format("localhost:%s", port), payload);
} finally {
Expand All @@ -539,17 +550,7 @@ public OutputT expand(InputT input) {
ImmutableList.Builder<String> args = ImmutableList.builder();
args.add(
"--port=" + port, "--fully_qualified_name_glob=*", "--pickle_library=cloudpickle");
if (!extraPackages.isEmpty()) {
File requirementsFile = File.createTempFile("requirements", ".txt");
requirementsFile.deleteOnExit();
try (Writer fout =
new OutputStreamWriter(
new FileOutputStream(requirementsFile.getAbsolutePath()), Charsets.UTF_8)) {
for (String pkg : extraPackages) {
fout.write(pkg);
fout.write('\n');
}
}
if (requirementsFile != null) {
args.add("--requirements_file=" + requirementsFile.getAbsolutePath());
}
PythonService service =
Expand Down
8 changes: 8 additions & 0 deletions sdks/java/transform-service/docker-compose/.env
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@

BEAM_VERSION=$BEAM_VERSION
CREDENTIALS_VOLUME=$CREDENTIALS_VOLUME
DEPENDENCIES_VOLUME=$DEPENDENCIES_VOLUME

# A requirements file with either of the following
# * PyPi packages
# Locally available packages relative to the directory provided to
# DEPENDENCIES_VOLUME.
PYTHON_REQUIREMENTS_FILE_NAME=$PYTHON_REQUIREMENTS_FILE_NAME

GOOGLE_APPLICATION_CREDENTIALS_FILE_NAME=application_default_credentials.json
COMPOSE_PROJECT_NAME=apache.beam.transform.service
TRANSFORM_SERVICE_PORT=$TRANSFORM_SERVICE_PORT
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ services:
expansion-service-2:
image: "apache/beam_python_expansion_service:${BEAM_VERSION}"
restart: on-failure
command: -id expansion-service-2 -port 5001
command: -id expansion-service-2 -port 5001 -requirements_file ${PYTHON_REQUIREMENTS_FILE_NAME} -dependencies_dir '/dependencies_volume'
volumes:
- ${CREDENTIALS_VOLUME}:/credentials_volume
- ${DEPENDENCIES_VOLUME}:/dependencies_volume
environment:
- GOOGLE_APPLICATION_CREDENTIALS=/credentials_volume/${GOOGLE_APPLICATION_CREDENTIALS_FILE_NAME}
3 changes: 3 additions & 0 deletions sdks/java/transform-service/launcher/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ dependencies {
shadow library.java.args4j
shadow library.java.error_prone_annotations
permitUnusedDeclared(library.java.error_prone_annotations)
testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation project(path: ":sdks:java:core")
}

sourceSets {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
*/
package org.apache.beam.sdk.transformservice.launcher;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
Expand All @@ -28,6 +30,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Files;
Expand Down Expand Up @@ -62,9 +65,9 @@ public class TransformServiceLauncher {
private static final int STATUS_LOGGER_WAIT_TIME = 3000;

@SuppressWarnings("argument")
private TransformServiceLauncher(@Nullable String projectName, int port) throws IOException {
LOG.info("Initializing the Beam Transform Service {}.", projectName);

private TransformServiceLauncher(
@Nullable String projectName, int port, @Nullable String pythonRequirementsFile)
throws IOException {
String tmpDirLocation = System.getProperty("java.io.tmpdir");
// We use Docker Compose project name as the name of the temporary directory to isolate
// different transform service instances that may be running in the same machine.
Expand All @@ -83,6 +86,7 @@ private TransformServiceLauncher(@Nullable String projectName, int port) throws
ByteStreams.copy(getClass().getResourceAsStream("/.env"), fout);
}

// Setting up the credentials directory.
File credentialsDir = Paths.get(tmpDir, "credentials_dir").toFile();
LOG.info(
"Creating a temporary directory for storing credentials: "
Expand Down Expand Up @@ -124,10 +128,84 @@ private TransformServiceLauncher(@Nullable String projectName, int port) throws
}
}

// Setting up the dependencies directory.
File dependenciesDir = Paths.get(tmpDir, "dependencies_dir").toFile();
Path updatedRequirementsFilePath = Paths.get(dependenciesDir.toString(), "requirements.txt");
LOG.info(
"Creating a temporary directory for storing dependencies: "
+ dependenciesDir.getAbsolutePath());
if (dependenciesDir.exists()) {
LOG.info("Reusing the existing dependencies directory " + dependenciesDir.getAbsolutePath());
} else {
if (!dependenciesDir.mkdir()) {
throw new IOException(
"Could not create a temporary directory for storing dependencies: "
+ dependenciesDir.getAbsolutePath());
}

// We create a requirements file with extra dependencies.
// If there are no extra dependencies, we just provide an empty requirements file.
File file = updatedRequirementsFilePath.toFile();
if (!file.createNewFile()) {
throw new IOException(
"Could not create the new requirements file " + updatedRequirementsFilePath);
}

// Updating dependencies.
if (pythonRequirementsFile != null) {
Path requirementsFilePath = Paths.get(pythonRequirementsFile);
List<String> updatedLines = new ArrayList<>();

try (Stream<String> lines = java.nio.file.Files.lines(requirementsFilePath)) {
lines.forEachOrdered(
line -> {
Path dependencyFilePath = Paths.get(line);
if (java.nio.file.Files.exists(dependencyFilePath)) {
Path fileName = dependencyFilePath.getFileName();
if (fileName == null) {
throw new IllegalArgumentException(
"Could not determine the filename of the local artifact "
+ dependencyFilePath);
}
try {
java.nio.file.Files.copy(
dependencyFilePath,
Paths.get(dependenciesDir.toString(), fileName.toString()));
} catch (IOException e) {
throw new RuntimeException(e);
}
updatedLines.add(fileName.toString());
} else {
updatedLines.add(line);
}
});
}

try (BufferedWriter writer =
java.nio.file.Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) {
for (String line : updatedLines) {
writer.write(line);
writer.newLine();
}
writer.flush();
}
}
}

// Setting environment variables used by the docker-compose.yml file.
environmentVariables.put("CREDENTIALS_VOLUME", credentialsDir.getAbsolutePath());
environmentVariables.put("DEPENDENCIES_VOLUME", dependenciesDir.getAbsolutePath());
environmentVariables.put("TRANSFORM_SERVICE_PORT", String.valueOf(port));

Path updatedRequirementsFileName = updatedRequirementsFilePath.getFileName();
if (updatedRequirementsFileName == null) {
throw new IllegalArgumentException(
"Could not determine the file name of the updated requirements file "
+ updatedRequirementsFilePath);
}
environmentVariables.put(
"PYTHON_REQUIREMENTS_FILE_NAME", updatedRequirementsFileName.toString());

// Building the Docker Compose command.
dockerComposeStartCommandPrefix.add("docker-compose");
dockerComposeStartCommandPrefix.add("-p");
Expand All @@ -136,21 +214,37 @@ private TransformServiceLauncher(@Nullable String projectName, int port) throws
dockerComposeStartCommandPrefix.add(dockerComposeFile.getAbsolutePath());
}

/**
* Specifies the Beam version to get containers for the transform service.
*
* <p>Could be a release Beam version with containers in Docker Hub or an unreleased Beam version
* for which containers are available locally.
*
* @param beamVersion a Beam version to get containers from.
*/
public void setBeamVersion(String beamVersion) {
environmentVariables.put("BEAM_VERSION", beamVersion);
}

public void setPythonExtraPackages(String pythonExtraPackages) {
environmentVariables.put("$PYTHON_EXTRA_PACKAGES", pythonExtraPackages);
}

/**
* Initializes a client for managing transform service instances.
*
* @param projectName project name for the transform service.
* @param port port exposed by the transform service.
* @param pythonRequirementsFile a requirements file with extra dependencies for the Python
* expansion services.
* @return an initialized client for managing the transform service.
* @throws IOException
*/
public static synchronized TransformServiceLauncher forProject(
@Nullable String projectName, int port) throws IOException {
@Nullable String projectName, int port, @Nullable String pythonRequirementsFile)
throws IOException {
if (projectName == null || projectName.isEmpty()) {
projectName = DEFAULT_PROJECT_NAME;
}
if (!launchers.containsKey(projectName)) {
launchers.put(projectName, new TransformServiceLauncher(projectName, port));
launchers.put(
projectName, new TransformServiceLauncher(projectName, port, pythonRequirementsFile));
}
return launchers.get(projectName);
}
Expand Down Expand Up @@ -238,6 +332,8 @@ private static class ArgConfig {
static final String PORT_ARG_NAME = "port";
static final String BEAM_VERSION_ARG_NAME = "beam_version";

static final String PYTHON_REQUIREMENTS_FILE_ARG_NAME = "python_requirements_file";

@Option(name = "--" + PROJECT_NAME_ARG_NAME, usage = "Docker compose project name")
private String projectName = "";

Expand All @@ -249,6 +345,11 @@ private static class ArgConfig {

@Option(name = "--" + BEAM_VERSION_ARG_NAME, usage = "Beam version to use.")
private String beamVersion = "";

@Option(
name = "--" + PYTHON_REQUIREMENTS_FILE_ARG_NAME,
usage = "Extra Python packages in the form of an requirements file.")
private String pythonRequirementsFile = "";
}

public static void main(String[] args) throws IOException, TimeoutException {
Expand Down Expand Up @@ -288,8 +389,12 @@ public static void main(String[] args) throws IOException, TimeoutException {
: ("port " + Integer.toString(config.port) + ".")));
System.out.println("===================================================");

String pythonRequirementsFile =
config.pythonRequirementsFile.isEmpty() ? config.pythonRequirementsFile : null;

TransformServiceLauncher service =
TransformServiceLauncher.forProject(config.projectName, config.port);
TransformServiceLauncher.forProject(
config.projectName, config.port, pythonRequirementsFile);
if (!config.beamVersion.isEmpty()) {
service.setBeamVersion(config.beamVersion);
}
Expand Down
Loading

0 comments on commit 479b15f

Please sign in to comment.