diff --git a/build.gradle.kts b/build.gradle.kts index a4a9a09e504f..c11779421cb8 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -310,6 +310,8 @@ tasks.register("javaPreCommit") { dependsOn(":sdks:java:testing:test-utils:build") dependsOn(":sdks:java:testing:tpcds:build") dependsOn(":sdks:java:testing:watermarks:build") + dependsOn(":sdks:java:transform-service:build") + dependsOn(":sdks:java:transform-service:launcher:build") dependsOn(":examples:java:preCommit") dependsOn(":examples:java:twitter:preCommit") diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java index d657bb31b184..5e1609f27a39 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java @@ -108,7 +108,7 @@ public RunnerApi.Pipeline upgradeTransformsViaTransformService( } else if (options.getTransformServiceBeamVersion() != null) { String projectName = UUID.randomUUID().toString(); int port = findAvailablePort(); - service = TransformServiceLauncher.forProject(projectName, port); + service = TransformServiceLauncher.forProject(projectName, port, null); service.setBeamVersion(options.getTransformServiceBeamVersion()); // Starting the transform service. diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java index 4a5f4f12a07a..5ba3484964c1 100644 --- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java +++ b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java @@ -495,6 +495,20 @@ public OutputT expand(InputT input) { boolean pythonAvailable = isPythonAvailable(); boolean dockerAvailable = isDockerAvailable(); + File requirementsFile = null; + if (!extraPackages.isEmpty()) { + requirementsFile = File.createTempFile("requirements", ".txt"); + requirementsFile.deleteOnExit(); + try (Writer fout = + new OutputStreamWriter( + new FileOutputStream(requirementsFile.getAbsolutePath()), Charsets.UTF_8)) { + for (String pkg : extraPackages) { + fout.write(pkg); + fout.write('\n'); + } + } + } + // We use the transform service if either of the following is true. // * It was explicitly requested. // * Python executable is not available in the system but Docker is available. @@ -514,19 +528,16 @@ public OutputT expand(InputT input) { projectName, port); - TransformServiceLauncher service = TransformServiceLauncher.forProject(projectName, port); + String pythonRequirementsFile = + requirementsFile != null ? requirementsFile.getAbsolutePath() : null; + TransformServiceLauncher service = + TransformServiceLauncher.forProject(projectName, port, pythonRequirementsFile); service.setBeamVersion(ReleaseInfo.getReleaseInfo().getSdkVersion()); - // TODO(https://github.com/apache/beam/issues/26833): add support for installing extra - // packages. - if (!extraPackages.isEmpty()) { - throw new RuntimeException( - "Transform Service does not support installing extra packages yet"); - } try { // Starting the transform service. service.start(); // Waiting the service to be ready. - service.waitTillUp(15000); + service.waitTillUp(-1); // Expanding the transform. output = apply(input, String.format("localhost:%s", port), payload); } finally { @@ -539,17 +550,7 @@ public OutputT expand(InputT input) { ImmutableList.Builder args = ImmutableList.builder(); args.add( "--port=" + port, "--fully_qualified_name_glob=*", "--pickle_library=cloudpickle"); - if (!extraPackages.isEmpty()) { - File requirementsFile = File.createTempFile("requirements", ".txt"); - requirementsFile.deleteOnExit(); - try (Writer fout = - new OutputStreamWriter( - new FileOutputStream(requirementsFile.getAbsolutePath()), Charsets.UTF_8)) { - for (String pkg : extraPackages) { - fout.write(pkg); - fout.write('\n'); - } - } + if (requirementsFile != null) { args.add("--requirements_file=" + requirementsFile.getAbsolutePath()); } PythonService service = diff --git a/sdks/java/transform-service/docker-compose/.env b/sdks/java/transform-service/docker-compose/.env index 5de5982cfa30..ed27b267fed3 100644 --- a/sdks/java/transform-service/docker-compose/.env +++ b/sdks/java/transform-service/docker-compose/.env @@ -12,6 +12,14 @@ BEAM_VERSION=$BEAM_VERSION CREDENTIALS_VOLUME=$CREDENTIALS_VOLUME +DEPENDENCIES_VOLUME=$DEPENDENCIES_VOLUME + +# A requirements file with either of the following +# * PyPi packages +# * Locally available packages relative to the directory provided to +# DEPENDENCIES_VOLUME. +PYTHON_REQUIREMENTS_FILE_NAME=$PYTHON_REQUIREMENTS_FILE_NAME + GOOGLE_APPLICATION_CREDENTIALS_FILE_NAME=application_default_credentials.json COMPOSE_PROJECT_NAME=apache.beam.transform.service TRANSFORM_SERVICE_PORT=$TRANSFORM_SERVICE_PORT diff --git a/sdks/java/transform-service/docker-compose/docker-compose.yml b/sdks/java/transform-service/docker-compose/docker-compose.yml index b685be10a329..39235533b9a8 100644 --- a/sdks/java/transform-service/docker-compose/docker-compose.yml +++ b/sdks/java/transform-service/docker-compose/docker-compose.yml @@ -32,8 +32,9 @@ services: expansion-service-2: image: "apache/beam_python_expansion_service:${BEAM_VERSION}" restart: on-failure - command: -id expansion-service-2 -port 5001 + command: -id expansion-service-2 -port 5001 -requirements_file ${PYTHON_REQUIREMENTS_FILE_NAME} -dependencies_dir '/dependencies_volume' volumes: - ${CREDENTIALS_VOLUME}:/credentials_volume + - ${DEPENDENCIES_VOLUME}:/dependencies_volume environment: - GOOGLE_APPLICATION_CREDENTIALS=/credentials_volume/${GOOGLE_APPLICATION_CREDENTIALS_FILE_NAME} diff --git a/sdks/java/transform-service/launcher/build.gradle b/sdks/java/transform-service/launcher/build.gradle index 83c5d60a1ef1..0952f37109eb 100644 --- a/sdks/java/transform-service/launcher/build.gradle +++ b/sdks/java/transform-service/launcher/build.gradle @@ -45,6 +45,9 @@ dependencies { shadow library.java.args4j shadow library.java.error_prone_annotations permitUnusedDeclared(library.java.error_prone_annotations) + testImplementation library.java.junit + testImplementation library.java.mockito_core + testImplementation project(path: ":sdks:java:core") } sourceSets { diff --git a/sdks/java/transform-service/launcher/src/main/java/org/apache/beam/sdk/transformservice/launcher/TransformServiceLauncher.java b/sdks/java/transform-service/launcher/src/main/java/org/apache/beam/sdk/transformservice/launcher/TransformServiceLauncher.java index f52fdfed710d..c0a9097a762f 100644 --- a/sdks/java/transform-service/launcher/src/main/java/org/apache/beam/sdk/transformservice/launcher/TransformServiceLauncher.java +++ b/sdks/java/transform-service/launcher/src/main/java/org/apache/beam/sdk/transformservice/launcher/TransformServiceLauncher.java @@ -17,9 +17,11 @@ */ package org.apache.beam.sdk.transformservice.launcher; +import java.io.BufferedWriter; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; @@ -28,6 +30,7 @@ import java.util.Locale; import java.util.Map; import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Files; @@ -62,9 +65,9 @@ public class TransformServiceLauncher { private static final int STATUS_LOGGER_WAIT_TIME = 3000; @SuppressWarnings("argument") - private TransformServiceLauncher(@Nullable String projectName, int port) throws IOException { - LOG.info("Initializing the Beam Transform Service {}.", projectName); - + private TransformServiceLauncher( + @Nullable String projectName, int port, @Nullable String pythonRequirementsFile) + throws IOException { String tmpDirLocation = System.getProperty("java.io.tmpdir"); // We use Docker Compose project name as the name of the temporary directory to isolate // different transform service instances that may be running in the same machine. @@ -83,14 +86,14 @@ private TransformServiceLauncher(@Nullable String projectName, int port) throws ByteStreams.copy(getClass().getResourceAsStream("/.env"), fout); } + // Setting up the credentials directory. File credentialsDir = Paths.get(tmpDir, "credentials_dir").toFile(); - LOG.info( - "Creating a temporary directory for storing credentials: " - + credentialsDir.getAbsolutePath()); - if (credentialsDir.exists()) { LOG.info("Reusing the existing credentials directory " + credentialsDir.getAbsolutePath()); } else { + LOG.info( + "Creating a temporary directory for storing credentials: " + + credentialsDir.getAbsolutePath()); if (!credentialsDir.mkdir()) { throw new IOException( "Could not create a temporary directory for storing credentials: " @@ -124,10 +127,84 @@ private TransformServiceLauncher(@Nullable String projectName, int port) throws } } + // Setting up the dependencies directory. + File dependenciesDir = Paths.get(tmpDir, "dependencies_dir").toFile(); + Path updatedRequirementsFilePath = Paths.get(dependenciesDir.toString(), "requirements.txt"); + if (dependenciesDir.exists()) { + LOG.info("Reusing the existing dependencies directory " + dependenciesDir.getAbsolutePath()); + } else { + LOG.info( + "Creating a temporary directory for storing dependencies: " + + dependenciesDir.getAbsolutePath()); + if (!dependenciesDir.mkdir()) { + throw new IOException( + "Could not create a temporary directory for storing dependencies: " + + dependenciesDir.getAbsolutePath()); + } + + // We create a requirements file with extra dependencies. + // If there are no extra dependencies, we just provide an empty requirements file. + File file = updatedRequirementsFilePath.toFile(); + if (!file.createNewFile()) { + throw new IOException( + "Could not create the new requirements file " + updatedRequirementsFilePath); + } + + // Updating dependencies. + if (pythonRequirementsFile != null) { + Path requirementsFilePath = Paths.get(pythonRequirementsFile); + List updatedLines = new ArrayList<>(); + + try (Stream lines = java.nio.file.Files.lines(requirementsFilePath)) { + lines.forEachOrdered( + line -> { + Path dependencyFilePath = Paths.get(line); + if (java.nio.file.Files.exists(dependencyFilePath)) { + Path fileName = dependencyFilePath.getFileName(); + if (fileName == null) { + throw new IllegalArgumentException( + "Could not determine the filename of the local artifact " + + dependencyFilePath); + } + try { + java.nio.file.Files.copy( + dependencyFilePath, + Paths.get(dependenciesDir.toString(), fileName.toString())); + } catch (IOException e) { + throw new RuntimeException(e); + } + updatedLines.add(fileName.toString()); + } else { + updatedLines.add(line); + } + }); + } + + try (BufferedWriter writer = + java.nio.file.Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) { + for (String line : updatedLines) { + writer.write(line); + writer.newLine(); + } + writer.flush(); + } + } + } + // Setting environment variables used by the docker-compose.yml file. environmentVariables.put("CREDENTIALS_VOLUME", credentialsDir.getAbsolutePath()); + environmentVariables.put("DEPENDENCIES_VOLUME", dependenciesDir.getAbsolutePath()); environmentVariables.put("TRANSFORM_SERVICE_PORT", String.valueOf(port)); + Path updatedRequirementsFileName = updatedRequirementsFilePath.getFileName(); + if (updatedRequirementsFileName == null) { + throw new IllegalArgumentException( + "Could not determine the file name of the updated requirements file " + + updatedRequirementsFilePath); + } + environmentVariables.put( + "PYTHON_REQUIREMENTS_FILE_NAME", updatedRequirementsFileName.toString()); + // Building the Docker Compose command. dockerComposeStartCommandPrefix.add("docker-compose"); dockerComposeStartCommandPrefix.add("-p"); @@ -136,21 +213,37 @@ private TransformServiceLauncher(@Nullable String projectName, int port) throws dockerComposeStartCommandPrefix.add(dockerComposeFile.getAbsolutePath()); } + /** + * Specifies the Beam version to get containers for the transform service. + * + *

Could be a release Beam version with containers in Docker Hub or an unreleased Beam version + * for which containers are available locally. + * + * @param beamVersion a Beam version to get containers from. + */ public void setBeamVersion(String beamVersion) { environmentVariables.put("BEAM_VERSION", beamVersion); } - public void setPythonExtraPackages(String pythonExtraPackages) { - environmentVariables.put("$PYTHON_EXTRA_PACKAGES", pythonExtraPackages); - } - + /** + * Initializes a client for managing transform service instances. + * + * @param projectName project name for the transform service. + * @param port port exposed by the transform service. + * @param pythonRequirementsFile a requirements file with extra dependencies for the Python + * expansion services. + * @return an initialized client for managing the transform service. + * @throws IOException + */ public static synchronized TransformServiceLauncher forProject( - @Nullable String projectName, int port) throws IOException { + @Nullable String projectName, int port, @Nullable String pythonRequirementsFile) + throws IOException { if (projectName == null || projectName.isEmpty()) { projectName = DEFAULT_PROJECT_NAME; } if (!launchers.containsKey(projectName)) { - launchers.put(projectName, new TransformServiceLauncher(projectName, port)); + launchers.put( + projectName, new TransformServiceLauncher(projectName, port, pythonRequirementsFile)); } return launchers.get(projectName); } @@ -200,10 +293,10 @@ public synchronized void status() throws IOException { public synchronized void waitTillUp(int timeout) throws IOException, TimeoutException { timeout = timeout <= 0 ? DEFAULT_START_WAIT_TIME : timeout; - String statusFileName = getStatus(); long startTime = System.currentTimeMillis(); while (System.currentTimeMillis() - startTime < timeout) { + String statusFileName = getStatus(); try { // We are just waiting for a local process. No need for exponential backoff. this.wait(1000); @@ -226,6 +319,7 @@ public synchronized void waitTillUp(int timeout) throws IOException, TimeoutExce private synchronized String getStatus() throws IOException { File outputOverride = File.createTempFile("output_override", null); + outputOverride.deleteOnExit(); runDockerComposeCommand(ImmutableList.of("ps"), outputOverride); return outputOverride.getAbsolutePath(); @@ -238,6 +332,8 @@ private static class ArgConfig { static final String PORT_ARG_NAME = "port"; static final String BEAM_VERSION_ARG_NAME = "beam_version"; + static final String PYTHON_REQUIREMENTS_FILE_ARG_NAME = "python_requirements_file"; + @Option(name = "--" + PROJECT_NAME_ARG_NAME, usage = "Docker compose project name") private String projectName = ""; @@ -249,6 +345,11 @@ private static class ArgConfig { @Option(name = "--" + BEAM_VERSION_ARG_NAME, usage = "Beam version to use.") private String beamVersion = ""; + + @Option( + name = "--" + PYTHON_REQUIREMENTS_FILE_ARG_NAME, + usage = "Extra Python packages in the form of an requirements file.") + private String pythonRequirementsFile = ""; } public static void main(String[] args) throws IOException, TimeoutException { @@ -288,8 +389,12 @@ public static void main(String[] args) throws IOException, TimeoutException { : ("port " + Integer.toString(config.port) + "."))); System.out.println("==================================================="); + String pythonRequirementsFile = + !config.pythonRequirementsFile.isEmpty() ? config.pythonRequirementsFile : null; + TransformServiceLauncher service = - TransformServiceLauncher.forProject(config.projectName, config.port); + TransformServiceLauncher.forProject( + config.projectName, config.port, pythonRequirementsFile); if (!config.beamVersion.isEmpty()) { service.setBeamVersion(config.beamVersion); } diff --git a/sdks/java/transform-service/launcher/src/test/java/org/apache/beam/sdk/transformservice/launcher/TransformServiceLauncherTest.java b/sdks/java/transform-service/launcher/src/test/java/org/apache/beam/sdk/transformservice/launcher/TransformServiceLauncherTest.java new file mode 100644 index 000000000000..4ef84b02061b --- /dev/null +++ b/sdks/java/transform-service/launcher/src/test/java/org/apache/beam/sdk/transformservice/launcher/TransformServiceLauncherTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transformservice.launcher; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.UUID; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class TransformServiceLauncherTest { + + @Test + public void testLauncherCreatesCredentialsDir() throws IOException { + String projectName = UUID.randomUUID().toString(); + Path expectedTempDir = Paths.get(System.getProperty("java.io.tmpdir"), projectName); + File file = expectedTempDir.toFile(); + file.deleteOnExit(); + TransformServiceLauncher.forProject(projectName, 12345, null); + Path expectedCredentialsDir = Paths.get(expectedTempDir.toString(), "credentials_dir"); + assertTrue(expectedCredentialsDir.toFile().exists()); + } + + @Test + public void testLauncherCreatesDependenciesDir() throws IOException { + String projectName = UUID.randomUUID().toString(); + Path expectedTempDir = Paths.get(System.getProperty("java.io.tmpdir"), projectName); + File file = expectedTempDir.toFile(); + file.deleteOnExit(); + TransformServiceLauncher.forProject(projectName, 12345, null); + Path expectedCredentialsDir = Paths.get(expectedTempDir.toString(), "dependencies_dir"); + assertTrue(expectedCredentialsDir.toFile().exists()); + } + + @Test + public void testLauncherInstallsDependencies() throws IOException { + String projectName = UUID.randomUUID().toString(); + Path expectedTempDir = Paths.get(System.getProperty("java.io.tmpdir"), projectName); + File file = expectedTempDir.toFile(); + file.deleteOnExit(); + + File requirementsFile = + Paths.get( + System.getProperty("java.io.tmpdir"), + ("requirements" + UUID.randomUUID().toString() + ".txt")) + .toFile(); + requirementsFile.deleteOnExit(); + + try (Writer fout = + new OutputStreamWriter( + new FileOutputStream(requirementsFile.getAbsolutePath()), Charsets.UTF_8)) { + fout.write("pypipackage1\n"); + fout.write("pypipackage2\n"); + } + + TransformServiceLauncher.forProject(projectName, 12345, requirementsFile.getAbsolutePath()); + + // Confirming that the Transform Service launcher created a temporary requirements file with the + // specified set of packages. + Path expectedUpdatedRequirementsFile = + Paths.get(expectedTempDir.toString(), "dependencies_dir", "requirements.txt"); + assertTrue(expectedUpdatedRequirementsFile.toFile().exists()); + + ArrayList expectedUpdatedRequirementsFileLines = new ArrayList<>(); + try (BufferedReader bufReader = + Files.newBufferedReader(expectedUpdatedRequirementsFile, UTF_8)) { + String line = bufReader.readLine(); + while (line != null) { + expectedUpdatedRequirementsFileLines.add(line); + line = bufReader.readLine(); + } + } + + assertEquals(2, expectedUpdatedRequirementsFileLines.size()); + assertTrue(expectedUpdatedRequirementsFileLines.contains("pypipackage1")); + assertTrue(expectedUpdatedRequirementsFileLines.contains("pypipackage2")); + } + + @Test + public void testLauncherInstallsLocalDependencies() throws IOException { + String projectName = UUID.randomUUID().toString(); + Path expectedTempDir = Paths.get(System.getProperty("java.io.tmpdir"), projectName); + File file = expectedTempDir.toFile(); + file.deleteOnExit(); + + String dependency1FileName = "dep_" + UUID.randomUUID().toString(); + File dependency1 = + Paths.get(System.getProperty("java.io.tmpdir"), dependency1FileName).toFile(); + dependency1.deleteOnExit(); + try (Writer fout = + new OutputStreamWriter( + new FileOutputStream(dependency1.getAbsolutePath()), Charsets.UTF_8)) { + fout.write("tempdata\n"); + } + + String dependency2FileName = "dep_" + UUID.randomUUID().toString(); + File dependency2 = + Paths.get(System.getProperty("java.io.tmpdir"), dependency2FileName).toFile(); + dependency2.deleteOnExit(); + try (Writer fout = + new OutputStreamWriter( + new FileOutputStream(dependency2.getAbsolutePath()), Charsets.UTF_8)) { + fout.write("tempdata\n"); + } + + File requirementsFile = + Paths.get( + System.getProperty("java.io.tmpdir"), + ("requirements" + UUID.randomUUID().toString() + ".txt")) + .toFile(); + requirementsFile.deleteOnExit(); + try (Writer fout = + new OutputStreamWriter( + new FileOutputStream(requirementsFile.getAbsolutePath()), Charsets.UTF_8)) { + fout.write(dependency1.getAbsolutePath() + "\n"); + fout.write(dependency2.getAbsolutePath() + "\n"); + fout.write("pypipackage" + "\n"); + } + + TransformServiceLauncher.forProject(projectName, 12345, requirementsFile.getAbsolutePath()); + + // Confirming that the Transform Service launcher created a temporary requirements file with the + // specified set of packages. + Path expectedUpdatedRequirementsFile = + Paths.get(expectedTempDir.toString(), "dependencies_dir", "requirements.txt"); + assertTrue(expectedUpdatedRequirementsFile.toFile().exists()); + + ArrayList expectedUpdatedRequirementsFileLines = new ArrayList<>(); + try (BufferedReader bufReader = + Files.newBufferedReader(expectedUpdatedRequirementsFile, UTF_8)) { + String line = bufReader.readLine(); + while (line != null) { + expectedUpdatedRequirementsFileLines.add(line); + line = bufReader.readLine(); + } + } + + // To make local packages available to the expansion service Docker containers, the temporary + // requirements file should contain names of the local packages relative to the dependencies + // volume and local packages should have been copied to the dependencies volume. + assertEquals(3, expectedUpdatedRequirementsFileLines.size()); + assertTrue(expectedUpdatedRequirementsFileLines.contains(dependency1FileName)); + assertTrue(expectedUpdatedRequirementsFileLines.contains(dependency2FileName)); + assertTrue(expectedUpdatedRequirementsFileLines.contains("pypipackage")); + + assertTrue( + Paths.get(expectedTempDir.toString(), "dependencies_dir", dependency1FileName) + .toFile() + .exists()); + assertTrue( + Paths.get(expectedTempDir.toString(), "dependencies_dir", dependency2FileName) + .toFile() + .exists()); + } +} diff --git a/sdks/java/transform-service/src/main/java/org/apache/beam/sdk/transformservice/ExpansionService.java b/sdks/java/transform-service/src/main/java/org/apache/beam/sdk/transformservice/ExpansionService.java index 17fe5472f9fc..0a2e65099e7d 100644 --- a/sdks/java/transform-service/src/main/java/org/apache/beam/sdk/transformservice/ExpansionService.java +++ b/sdks/java/transform-service/src/main/java/org/apache/beam/sdk/transformservice/ExpansionService.java @@ -17,15 +17,22 @@ */ package org.apache.beam.sdk.transformservice; +import java.io.IOException; +import java.net.Socket; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; import org.apache.beam.model.expansion.v1.ExpansionApi; +import org.apache.beam.model.expansion.v1.ExpansionApi.ExpansionResponse; import org.apache.beam.model.expansion.v1.ExpansionServiceGrpc; import org.apache.beam.model.pipeline.v1.Endpoints; import org.apache.beam.runners.core.construction.DefaultExpansionServiceClientFactory; import org.apache.beam.runners.core.construction.ExpansionServiceClientFactory; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannelBuilder; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables; import org.checkerframework.checker.nullness.qual.Nullable; @@ -40,6 +47,12 @@ public class ExpansionService extends ExpansionServiceGrpc.ExpansionServiceImplB final List endpoints; + private boolean checkedAllServices = false; + + private static final long SERVICE_CHECK_TIMEOUT_MILLIS = 60000; + + private boolean disableServiceCheck = false; + ExpansionService( List endpoints, @Nullable ExpansionServiceClientFactory clientFactory) { @@ -48,10 +61,65 @@ public class ExpansionService extends ExpansionServiceGrpc.ExpansionServiceImplB clientFactory != null ? clientFactory : DEFAULT_EXPANSION_SERVICE_CLIENT_FACTORY; } + // Waits till all expansion services are ready. + private void waitForAllServicesToBeReady() throws TimeoutException { + if (disableServiceCheck) { + // Service check disabled. Just returning. + return; + } + + outer: + for (Endpoints.ApiServiceDescriptor endpoint : endpoints) { + long start = System.currentTimeMillis(); + long duration = 10; + while (System.currentTimeMillis() - start < SERVICE_CHECK_TIMEOUT_MILLIS) { + try { + String url = endpoint.getUrl(); + int portIndex = url.lastIndexOf(":"); + if (portIndex <= 0) { + throw new RuntimeException( + "Expected the endpoint to be of the form : but received " + url); + } + int port = Integer.parseInt(url.substring(portIndex + 1)); + String host = url.substring(0, portIndex); + new Socket(host, port).close(); + // Current service is up. Checking the next one. + continue outer; + } catch (IOException exn) { + try { + Thread.sleep(duration); + } catch (InterruptedException e) { + // Ignore + } + duration = (long) (duration * 1.2); + } + } + throw new TimeoutException( + "Timeout waiting for the service " + + endpoint.getUrl() + + " to startup after " + + (System.currentTimeMillis() - start) + + " milliseconds."); + } + } + + @VisibleForTesting + void disableServiceCheck() { + disableServiceCheck = true; + } + @Override public void expand( ExpansionApi.ExpansionRequest request, StreamObserver responseObserver) { + if (!checkedAllServices) { + try { + waitForAllServicesToBeReady(); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + checkedAllServices = true; + } try { responseObserver.onNext(processExpand(request)); responseObserver.onCompleted(); @@ -68,6 +136,14 @@ public void expand( public void discoverSchemaTransform( ExpansionApi.DiscoverSchemaTransformRequest request, StreamObserver responseObserver) { + if (!checkedAllServices) { + try { + waitForAllServicesToBeReady(); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + checkedAllServices = true; + } try { responseObserver.onNext(processDiscover(request)); responseObserver.onCompleted(); @@ -80,18 +156,41 @@ public void discoverSchemaTransform( } } - /*package*/ ExpansionApi.ExpansionResponse processExpand(ExpansionApi.ExpansionRequest request) { + private ExpansionApi.ExpansionResponse getAggregatedErrorResponse( + Map errorResponses) { + StringBuilder errorMessageBuilder = new StringBuilder(); + + errorMessageBuilder.append( + "Aggregated errors from " + errorResponses.size() + " expansion services." + "\n"); + for (Map.Entry entry : errorResponses.entrySet()) { + errorMessageBuilder.append( + "Error from expansion service " + + entry.getKey() + + ": " + + entry.getValue().getError() + + "\n"); + } + + return errorResponses + .values() + .iterator() + .next() + .toBuilder() + .setError(errorMessageBuilder.toString()) + .build(); + } + + ExpansionApi.ExpansionResponse processExpand(ExpansionApi.ExpansionRequest request) { // Trying out expansion services in order till one succeeds. // If all services fail, re-raises the last error. - // TODO: when all services fail, return an aggregated error with errors from all services. - ExpansionApi.ExpansionResponse lastErrorResponse = null; + Map errorResponses = new HashMap<>(); RuntimeException lastException = null; for (Endpoints.ApiServiceDescriptor endpoint : endpoints) { try { ExpansionApi.ExpansionResponse response = expansionServiceClientFactory.getExpansionServiceClient(endpoint).expand(request); if (!response.getError().isEmpty()) { - lastErrorResponse = response; + errorResponses.put(endpoint.getUrl(), response); continue; } return response; @@ -99,8 +198,11 @@ public void discoverSchemaTransform( lastException = e; } } - if (lastErrorResponse != null) { - return lastErrorResponse; + if (lastException != null) { + throw new RuntimeException("Expansion request to transform service failed.", lastException); + } + if (!errorResponses.isEmpty()) { + return getAggregatedErrorResponse(errorResponses); } else if (lastException != null) { throw new RuntimeException("Expansion request to transform service failed.", lastException); } else { diff --git a/sdks/java/transform-service/src/test/java/org/apache/beam/sdk/transformservice/ExpansionServiceTest.java b/sdks/java/transform-service/src/test/java/org/apache/beam/sdk/transformservice/ExpansionServiceTest.java index 298bce87f901..9905abd1d9ba 100644 --- a/sdks/java/transform-service/src/test/java/org/apache/beam/sdk/transformservice/ExpansionServiceTest.java +++ b/sdks/java/transform-service/src/test/java/org/apache/beam/sdk/transformservice/ExpansionServiceTest.java @@ -60,6 +60,8 @@ public void setUp() throws Exception { endpoints.add(endpoint2); clientFactory = Mockito.mock(ExpansionServiceClientFactory.class); expansionService = new ExpansionService(endpoints, clientFactory); + // We do not run actual services in unit tests. + expansionService.disableServiceCheck(); } @Test @@ -131,7 +133,10 @@ public void testExpandFail() { ArgumentCaptor expansionResponseCapture = ArgumentCaptor.forClass(ExpansionResponse.class); Mockito.verify(responseObserver).onNext(expansionResponseCapture.capture()); - assertEquals("expansion error 2", expansionResponseCapture.getValue().getError()); + + // Error response should contain errors from both expansion services. + assertTrue(expansionResponseCapture.getValue().getError().contains("expansion error 1")); + assertTrue(expansionResponseCapture.getValue().getError().contains("expansion error 2")); } @Test diff --git a/sdks/python/apache_beam/utils/transform_service_launcher.py b/sdks/python/apache_beam/utils/transform_service_launcher.py index 33feab9bf29c..ac492513aba5 100644 --- a/sdks/python/apache_beam/utils/transform_service_launcher.py +++ b/sdks/python/apache_beam/utils/transform_service_launcher.py @@ -86,6 +86,7 @@ def __init__(self, project_name, port, beam_version=None): compose_file = os.path.join(temp_dir, 'docker-compose.yml') + # Creating the credentials volume. credentials_dir = os.path.join(temp_dir, 'credentials_dir') if not os.path.exists(credentials_dir): os.mkdir(credentials_dir) @@ -111,11 +112,24 @@ def __init__(self, project_name, port, beam_version=None): 'credentials file at the expected location %s.' % application_default_path_file) + # Creating the dependencies volume. + dependencies_dir = os.path.join(temp_dir, 'dependencies_dir') + if not os.path.exists(dependencies_dir): + os.mkdir(dependencies_dir) + self._environmental_variables = {} self._environmental_variables['CREDENTIALS_VOLUME'] = credentials_dir + self._environmental_variables['DEPENDENCIES_VOLUME'] = dependencies_dir self._environmental_variables['TRANSFORM_SERVICE_PORT'] = str(port) self._environmental_variables['BEAM_VERSION'] = beam_version + # Setting an empty requirements file + requirements_file_name = os.path.join(dependencies_dir, 'requirements.txt') + with open(requirements_file_name, 'w') as _: + pass + self._environmental_variables['PYTHON_REQUIREMENTS_FILE_NAME'] = ( + 'requirements.txt') + self._docker_compose_start_command_prefix = [] self._docker_compose_start_command_prefix.append('docker-compose') self._docker_compose_start_command_prefix.append('-p') diff --git a/sdks/python/expansion-service-container/boot.go b/sdks/python/expansion-service-container/boot.go index 90a97c35425a..ba56b349c4ea 100644 --- a/sdks/python/expansion-service-container/boot.go +++ b/sdks/python/expansion-service-container/boot.go @@ -18,8 +18,10 @@ package main import ( + "bufio" "flag" "fmt" + "io/ioutil" "log" "os" "path/filepath" @@ -31,16 +33,15 @@ import ( ) var ( - id = flag.String("id", "", "Local identifier (required)") - port = flag.Int("port", 0, "Port for the expansion service (required)") + id = flag.String("id", "", "Local identifier (required)") + port = flag.Int("port", 0, "Port for the expansion service (required)") + requirements_file = flag.String("requirements_file", "", "A requirement file with extra packages to be made available to the transforms being expanded. Path should be relative to the 'dependencies_dir'") + dependencies_dir = flag.String("dependencies_dir", "", "A directory that stores locally available extra packages.") ) const ( expansionServiceEntrypoint = "apache_beam.runners.portability.expansion_service_main" venvDirectory = "beam_venv" // This should match the venv directory name used in the Dockerfile. - requirementsFile = "requirements.txt" - beamSDKArtifact = "apache-beam-sdk.tar.gz" - beamSDKOptions = "[gcp,dataframe]" ) func main() { @@ -58,6 +59,79 @@ func main() { } } +func getLines(fileNameToRead string) ([]string, error) { + fileToRead, err := os.Open(fileNameToRead) + if err != nil { + return nil, err + } + defer fileToRead.Close() + + sc := bufio.NewScanner(fileToRead) + lines := make([]string, 0) + + // Read through 'tokens' until an EOF is encountered. + for sc.Scan() { + lines = append(lines, sc.Text()) + } + + if err := sc.Err(); err != nil { + return nil, err + } + return lines, nil +} + +func installExtraPackages(requirementsFile string) error { + extraPackages, err := getLines(requirementsFile) + if err != nil { + return err + } + + for _, extraPackage := range extraPackages { + log.Printf("Installing extra package %v", extraPackage) + // We expect 'pip' command in virtual env to be already available at the top of the PATH. + args := []string{"install", extraPackage} + if err := execx.Execute("pip", args...); err != nil { + return fmt.Errorf("Could not install the package %s: %s", extraPackage, err) + } + } + return nil +} + +func getUpdatedRequirementsFile(oldRequirementsFileName string, dependenciesDir string) (string, error) { + oldExtraPackages, err := getLines(filepath.Join(dependenciesDir, oldRequirementsFileName)) + if err != nil { + return "", err + } + var updatedExtraPackages = make([]string, 0) + for _, extraPackage := range oldExtraPackages { + // TODO update + potentialLocalFilePath := filepath.Join(dependenciesDir, extraPackage) + _, err := os.Stat(potentialLocalFilePath) + if err == nil { + // Package exists locally so using that. + extraPackage = potentialLocalFilePath + log.Printf("Using locally available extra package %v", extraPackage) + } + updatedExtraPackages = append(updatedExtraPackages, extraPackage) + } + + updatedRequirementsFile, err := ioutil.TempFile("/opt/apache/beam", "requirements*.txt") + if err != nil { + return "", err + } + + updatedRequirementsFileName := updatedRequirementsFile.Name() + + datawriter := bufio.NewWriter(updatedRequirementsFile) + for _, extraPackage := range updatedExtraPackages { + _, _ = datawriter.WriteString(extraPackage + "\n") + } + datawriter.Flush() + updatedRequirementsFile.Close() + + return updatedRequirementsFileName, nil +} + func launchExpansionServiceProcess() error { pythonVersion, err := expansionx.GetPythonVersion() if err != nil { @@ -70,6 +144,24 @@ func launchExpansionServiceProcess() error { os.Setenv("PATH", strings.Join([]string{filepath.Join(dir, "bin"), os.Getenv("PATH")}, ":")) args := []string{"-m", expansionServiceEntrypoint, "-p", strconv.Itoa(*port), "--fully_qualified_name_glob", "*"} + + if *requirements_file != "" { + log.Printf("Received the requirements file %v", *requirements_file) + updatedRequirementsFileName, err := getUpdatedRequirementsFile(*requirements_file, *dependencies_dir) + if err != nil { + return err + } + defer os.Remove(updatedRequirementsFileName) + log.Printf("Updated requirements file is %v", updatedRequirementsFileName) + // Provide the requirements file to the expansion service so that packages get staged by runners. + args = append(args, "--requirements_file", updatedRequirementsFileName) + // Install packages locally so that they can be used by the expansion service during transform + // expansion if needed. + err = installExtraPackages(updatedRequirementsFileName) + if err != nil { + return err + } + } if err := execx.Execute(pythonVersion, args...); err != nil { return fmt.Errorf("could not start the expansion service: %s", err) }