Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates the Transform Service to accept Python extra packages through the Java API #28783

Merged
merged 3 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ tasks.register("javaPreCommit") {
dependsOn(":sdks:java:testing:test-utils:build")
dependsOn(":sdks:java:testing:tpcds:build")
dependsOn(":sdks:java:testing:watermarks:build")
dependsOn(":sdks:java:transform-service:build")
dependsOn(":sdks:java:transform-service:launcher:build")

dependsOn(":examples:java:preCommit")
dependsOn(":examples:java:twitter:preCommit")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public RunnerApi.Pipeline upgradeTransformsViaTransformService(
} else if (options.getTransformServiceBeamVersion() != null) {
String projectName = UUID.randomUUID().toString();
int port = findAvailablePort();
service = TransformServiceLauncher.forProject(projectName, port);
service = TransformServiceLauncher.forProject(projectName, port, null);
service.setBeamVersion(options.getTransformServiceBeamVersion());

// Starting the transform service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,20 @@ public OutputT expand(InputT input) {
boolean pythonAvailable = isPythonAvailable();
boolean dockerAvailable = isDockerAvailable();

File requirementsFile = null;
if (!extraPackages.isEmpty()) {
requirementsFile = File.createTempFile("requirements", ".txt");
requirementsFile.deleteOnExit();
try (Writer fout =
new OutputStreamWriter(
new FileOutputStream(requirementsFile.getAbsolutePath()), Charsets.UTF_8)) {
for (String pkg : extraPackages) {
fout.write(pkg);
fout.write('\n');
}
}
}

// We use the transform service if either of the following is true.
// * It was explicitly requested.
// * Python executable is not available in the system but Docker is available.
Expand All @@ -514,19 +528,16 @@ public OutputT expand(InputT input) {
projectName,
port);

TransformServiceLauncher service = TransformServiceLauncher.forProject(projectName, port);
String pythonRequirementsFile =
requirementsFile != null ? requirementsFile.getAbsolutePath() : null;
TransformServiceLauncher service =
TransformServiceLauncher.forProject(projectName, port, pythonRequirementsFile);
service.setBeamVersion(ReleaseInfo.getReleaseInfo().getSdkVersion());
// TODO(https://github.com/apache/beam/issues/26833): add support for installing extra
// packages.
if (!extraPackages.isEmpty()) {
throw new RuntimeException(
"Transform Service does not support installing extra packages yet");
}
try {
// Starting the transform service.
service.start();
// Waiting the service to be ready.
service.waitTillUp(15000);
service.waitTillUp(-1);
// Expanding the transform.
output = apply(input, String.format("localhost:%s", port), payload);
} finally {
Expand All @@ -539,17 +550,7 @@ public OutputT expand(InputT input) {
ImmutableList.Builder<String> args = ImmutableList.builder();
args.add(
"--port=" + port, "--fully_qualified_name_glob=*", "--pickle_library=cloudpickle");
if (!extraPackages.isEmpty()) {
File requirementsFile = File.createTempFile("requirements", ".txt");
requirementsFile.deleteOnExit();
try (Writer fout =
new OutputStreamWriter(
new FileOutputStream(requirementsFile.getAbsolutePath()), Charsets.UTF_8)) {
for (String pkg : extraPackages) {
fout.write(pkg);
fout.write('\n');
}
}
if (requirementsFile != null) {
args.add("--requirements_file=" + requirementsFile.getAbsolutePath());
}
PythonService service =
Expand Down
8 changes: 8 additions & 0 deletions sdks/java/transform-service/docker-compose/.env
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@

BEAM_VERSION=$BEAM_VERSION
CREDENTIALS_VOLUME=$CREDENTIALS_VOLUME
DEPENDENCIES_VOLUME=$DEPENDENCIES_VOLUME

# A requirements file with either of the following
# * PyPi packages
# Locally available packages relative to the directory provided to
# DEPENDENCIES_VOLUME.
PYTHON_REQUIREMENTS_FILE_NAME=$PYTHON_REQUIREMENTS_FILE_NAME

GOOGLE_APPLICATION_CREDENTIALS_FILE_NAME=application_default_credentials.json
COMPOSE_PROJECT_NAME=apache.beam.transform.service
TRANSFORM_SERVICE_PORT=$TRANSFORM_SERVICE_PORT
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ services:
expansion-service-2:
image: "apache/beam_python_expansion_service:${BEAM_VERSION}"
restart: on-failure
command: -id expansion-service-2 -port 5001
command: -id expansion-service-2 -port 5001 -requirements_file ${PYTHON_REQUIREMENTS_FILE_NAME} -dependencies_dir '/dependencies_volume'
volumes:
- ${CREDENTIALS_VOLUME}:/credentials_volume
- ${DEPENDENCIES_VOLUME}:/dependencies_volume
environment:
- GOOGLE_APPLICATION_CREDENTIALS=/credentials_volume/${GOOGLE_APPLICATION_CREDENTIALS_FILE_NAME}
3 changes: 3 additions & 0 deletions sdks/java/transform-service/launcher/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ dependencies {
shadow library.java.args4j
shadow library.java.error_prone_annotations
permitUnusedDeclared(library.java.error_prone_annotations)
testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation project(path: ":sdks:java:core")
}

sourceSets {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
*/
package org.apache.beam.sdk.transformservice.launcher;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
Expand All @@ -28,6 +30,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Files;
Expand Down Expand Up @@ -62,9 +65,9 @@ public class TransformServiceLauncher {
private static final int STATUS_LOGGER_WAIT_TIME = 3000;

@SuppressWarnings("argument")
private TransformServiceLauncher(@Nullable String projectName, int port) throws IOException {
LOG.info("Initializing the Beam Transform Service {}.", projectName);

private TransformServiceLauncher(
@Nullable String projectName, int port, @Nullable String pythonRequirementsFile)
throws IOException {
String tmpDirLocation = System.getProperty("java.io.tmpdir");
// We use Docker Compose project name as the name of the temporary directory to isolate
// different transform service instances that may be running in the same machine.
Expand All @@ -83,14 +86,14 @@ private TransformServiceLauncher(@Nullable String projectName, int port) throws
ByteStreams.copy(getClass().getResourceAsStream("/.env"), fout);
}

// Setting up the credentials directory.
File credentialsDir = Paths.get(tmpDir, "credentials_dir").toFile();
LOG.info(
"Creating a temporary directory for storing credentials: "
+ credentialsDir.getAbsolutePath());

if (credentialsDir.exists()) {
LOG.info("Reusing the existing credentials directory " + credentialsDir.getAbsolutePath());
} else {
LOG.info(
"Creating a temporary directory for storing credentials: "
+ credentialsDir.getAbsolutePath());
if (!credentialsDir.mkdir()) {
throw new IOException(
"Could not create a temporary directory for storing credentials: "
Expand Down Expand Up @@ -124,10 +127,84 @@ private TransformServiceLauncher(@Nullable String projectName, int port) throws
}
}

// Setting up the dependencies directory.
File dependenciesDir = Paths.get(tmpDir, "dependencies_dir").toFile();
Path updatedRequirementsFilePath = Paths.get(dependenciesDir.toString(), "requirements.txt");
if (dependenciesDir.exists()) {
LOG.info("Reusing the existing dependencies directory " + dependenciesDir.getAbsolutePath());
} else {
LOG.info(
"Creating a temporary directory for storing dependencies: "
+ dependenciesDir.getAbsolutePath());
if (!dependenciesDir.mkdir()) {
throw new IOException(
"Could not create a temporary directory for storing dependencies: "
+ dependenciesDir.getAbsolutePath());
}

// We create a requirements file with extra dependencies.
// If there are no extra dependencies, we just provide an empty requirements file.
File file = updatedRequirementsFilePath.toFile();
if (!file.createNewFile()) {
throw new IOException(
"Could not create the new requirements file " + updatedRequirementsFilePath);
}

// Updating dependencies.
if (pythonRequirementsFile != null) {
Path requirementsFilePath = Paths.get(pythonRequirementsFile);
List<String> updatedLines = new ArrayList<>();

try (Stream<String> lines = java.nio.file.Files.lines(requirementsFilePath)) {
lines.forEachOrdered(
line -> {
Path dependencyFilePath = Paths.get(line);
if (java.nio.file.Files.exists(dependencyFilePath)) {
Path fileName = dependencyFilePath.getFileName();
if (fileName == null) {
throw new IllegalArgumentException(
"Could not determine the filename of the local artifact "
+ dependencyFilePath);
}
try {
java.nio.file.Files.copy(
dependencyFilePath,
Paths.get(dependenciesDir.toString(), fileName.toString()));
} catch (IOException e) {
throw new RuntimeException(e);
}
updatedLines.add(fileName.toString());
} else {
updatedLines.add(line);
}
});
}

try (BufferedWriter writer =
java.nio.file.Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) {
for (String line : updatedLines) {
writer.write(line);
writer.newLine();
}
writer.flush();
}
}
}

// Setting environment variables used by the docker-compose.yml file.
environmentVariables.put("CREDENTIALS_VOLUME", credentialsDir.getAbsolutePath());
environmentVariables.put("DEPENDENCIES_VOLUME", dependenciesDir.getAbsolutePath());
environmentVariables.put("TRANSFORM_SERVICE_PORT", String.valueOf(port));

Path updatedRequirementsFileName = updatedRequirementsFilePath.getFileName();
if (updatedRequirementsFileName == null) {
throw new IllegalArgumentException(
"Could not determine the file name of the updated requirements file "
+ updatedRequirementsFilePath);
}
environmentVariables.put(
"PYTHON_REQUIREMENTS_FILE_NAME", updatedRequirementsFileName.toString());

// Building the Docker Compose command.
dockerComposeStartCommandPrefix.add("docker-compose");
dockerComposeStartCommandPrefix.add("-p");
Expand All @@ -136,21 +213,37 @@ private TransformServiceLauncher(@Nullable String projectName, int port) throws
dockerComposeStartCommandPrefix.add(dockerComposeFile.getAbsolutePath());
}

/**
* Specifies the Beam version to get containers for the transform service.
*
* <p>Could be a release Beam version with containers in Docker Hub or an unreleased Beam version
* for which containers are available locally.
*
* @param beamVersion a Beam version to get containers from.
*/
public void setBeamVersion(String beamVersion) {
environmentVariables.put("BEAM_VERSION", beamVersion);
}

public void setPythonExtraPackages(String pythonExtraPackages) {
environmentVariables.put("$PYTHON_EXTRA_PACKAGES", pythonExtraPackages);
}

/**
* Initializes a client for managing transform service instances.
*
* @param projectName project name for the transform service.
* @param port port exposed by the transform service.
* @param pythonRequirementsFile a requirements file with extra dependencies for the Python
* expansion services.
* @return an initialized client for managing the transform service.
* @throws IOException
*/
public static synchronized TransformServiceLauncher forProject(
@Nullable String projectName, int port) throws IOException {
@Nullable String projectName, int port, @Nullable String pythonRequirementsFile)
throws IOException {
if (projectName == null || projectName.isEmpty()) {
projectName = DEFAULT_PROJECT_NAME;
}
if (!launchers.containsKey(projectName)) {
launchers.put(projectName, new TransformServiceLauncher(projectName, port));
launchers.put(
projectName, new TransformServiceLauncher(projectName, port, pythonRequirementsFile));
}
return launchers.get(projectName);
}
Expand Down Expand Up @@ -200,10 +293,10 @@ public synchronized void status() throws IOException {

public synchronized void waitTillUp(int timeout) throws IOException, TimeoutException {
timeout = timeout <= 0 ? DEFAULT_START_WAIT_TIME : timeout;
String statusFileName = getStatus();

long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < timeout) {
String statusFileName = getStatus();
try {
// We are just waiting for a local process. No need for exponential backoff.
this.wait(1000);
Expand All @@ -226,6 +319,7 @@ public synchronized void waitTillUp(int timeout) throws IOException, TimeoutExce

private synchronized String getStatus() throws IOException {
File outputOverride = File.createTempFile("output_override", null);
outputOverride.deleteOnExit();
runDockerComposeCommand(ImmutableList.of("ps"), outputOverride);

return outputOverride.getAbsolutePath();
Expand All @@ -238,6 +332,8 @@ private static class ArgConfig {
static final String PORT_ARG_NAME = "port";
static final String BEAM_VERSION_ARG_NAME = "beam_version";

static final String PYTHON_REQUIREMENTS_FILE_ARG_NAME = "python_requirements_file";

@Option(name = "--" + PROJECT_NAME_ARG_NAME, usage = "Docker compose project name")
private String projectName = "";

Expand All @@ -249,6 +345,11 @@ private static class ArgConfig {

@Option(name = "--" + BEAM_VERSION_ARG_NAME, usage = "Beam version to use.")
private String beamVersion = "";

@Option(
name = "--" + PYTHON_REQUIREMENTS_FILE_ARG_NAME,
usage = "Extra Python packages in the form of an requirements file.")
private String pythonRequirementsFile = "";
}

public static void main(String[] args) throws IOException, TimeoutException {
Expand Down Expand Up @@ -288,8 +389,12 @@ public static void main(String[] args) throws IOException, TimeoutException {
: ("port " + Integer.toString(config.port) + ".")));
System.out.println("===================================================");

String pythonRequirementsFile =
!config.pythonRequirementsFile.isEmpty() ? config.pythonRequirementsFile : null;

TransformServiceLauncher service =
TransformServiceLauncher.forProject(config.projectName, config.port);
TransformServiceLauncher.forProject(
config.projectName, config.port, pythonRequirementsFile);
if (!config.beamVersion.isEmpty()) {
service.setBeamVersion(config.beamVersion);
}
Expand Down
Loading
Loading