Skip to content

Commit

Permalink
Merge pull request #29283 Cross language pipelines without Docker.
Browse files Browse the repository at this point in the history
Allow local runners to execute arbitrary cross language pipelines without Docker.
  • Loading branch information
robertwb authored Dec 1, 2023
2 parents 5f6afce + d85ce3f commit f337c74
Show file tree
Hide file tree
Showing 21 changed files with 634 additions and 120 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* The Python SDK now type checks `collections.abc.Collections` types properly. Some type hints that were erroneously allowed by the SDK may now fail. ([#29272](https://github.com/apache/beam/pull/29272))
* Running multi-language pipelines locally no longer requires Docker.
Instead, the same (generally auto-started) subprocess used to perform the
expansion can also be used as the cross-language worker.

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1563,13 +1563,26 @@ message Environment {

message StandardEnvironments {
enum Environments {
DOCKER = 0 [(beam_urn) = "beam:env:docker:v1"]; // A managed docker container to run user code.

PROCESS = 1 [(beam_urn) = "beam:env:process:v1"]; // A managed native process to run user code.

EXTERNAL = 2 [(beam_urn) = "beam:env:external:v1"]; // An external non managed process to run user code.

DEFAULT = 3 [(beam_urn) = "beam:env:default:v1"]; // Used as a stub when context is missing a runner-provided default environment.
// A managed docker container to run user code.
// Payload should be DockerPayload.
DOCKER = 0 [(beam_urn) = "beam:env:docker:v1"];

// A managed native process to run user code.
// Payload should be ProcessPayload.
PROCESS = 1 [(beam_urn) = "beam:env:process:v1"];

// An external non managed process to run user code.
// Payload should be ExternalPayload.
EXTERNAL = 2 [(beam_urn) = "beam:env:external:v1"];

// Used as a stub when context is missing a runner-provided default environment.
DEFAULT = 3 [(beam_urn) = "beam:env:default:v1"];

// A selection of equivalent fully-specified environments a runner may use.
// Note that this environment itself does not declare any dependencies or capabilities,
// as those may differ among the several alternatives.
// Payload should be AnyOfEnvironmentPayload.
ANYOF = 4 [(beam_urn) = "beam:env:anyof:v1"];
}
}

Expand All @@ -1586,10 +1599,15 @@ message ProcessPayload {
}

message ExternalPayload {
ApiServiceDescriptor endpoint = 1;
ApiServiceDescriptor endpoint = 1; // Serving BeamFnExternalWorkerPool API.
map<string, string> params = 2; // Arbitrary extra parameters to pass
}

message AnyOfEnvironmentPayload {
// Each is fully contained (with their own dependencies, capabilities, etc.)
repeated Environment environments = 1;
}

// These URNs are used to indicate capabilities of environments that cannot
// simply be expressed as a component (such as a Coder or PTransform) that this
// environment understands.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.AnyOfEnvironmentPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.ArtifactInformation;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.DockerPayload;
Expand Down Expand Up @@ -291,6 +294,50 @@ public static Environment createProcessEnvironment(
.build();
}

public static Environment createAnyOfEnvironment(Environment... environments) {
AnyOfEnvironmentPayload.Builder payload = AnyOfEnvironmentPayload.newBuilder();
for (Environment environment : environments) {
payload.addEnvironments(environment);
}
return Environment.newBuilder()
.setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.ANYOF))
.setPayload(payload.build().toByteString())
.build();
}

public static List<Environment> expandAnyOfEnvironments(Environment environment) {
return Stream.of(environment)
.flatMap(
env -> {
if (BeamUrns.getUrn(StandardEnvironments.Environments.ANYOF)
.equals(environment.getUrn())) {
try {
return AnyOfEnvironmentPayload.parseFrom(environment.getPayload())
.getEnvironmentsList().stream()
.flatMap(subenv -> expandAnyOfEnvironments(subenv).stream());
} catch (InvalidProtocolBufferException exn) {
throw new RuntimeException(exn);
}
} else {
return Stream.of(env);
}
})
.collect(Collectors.toList());
}

public static Environment resolveAnyOfEnvironment(
Environment environment, String... preferredEnvironmentTypes) {
List<Environment> allEnvironments = expandAnyOfEnvironments(environment);
for (String urn : preferredEnvironmentTypes) {
for (Environment env : allEnvironments) {
if (urn.equals(env.getUrn())) {
return env;
}
}
}
return allEnvironments.iterator().next();
}

public static Optional<Environment> getEnvironment(String ptransformId, Components components) {
PTransform ptransform = components.getTransformsOrThrow(ptransformId);
String envId = ptransform.getEnvironmentId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@

import static org.apache.beam.runners.core.construction.Environments.JAVA_SDK_HARNESS_CONTAINER_URL;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import org.apache.beam.model.pipeline.v1.Endpoints;
Expand Down Expand Up @@ -353,4 +356,42 @@ public void testGetArtifactsBadNamedFileLogsWarn() throws Exception {
assertThat(artifacts, hasSize(1));
expectedLogs.verifyWarn("name 'file_name' was not found");
}

@Test
public void testExpandAnyOfEnvironmentsOnOrdinaryEnvironment() {
Environment env = Environments.createDockerEnvironment("java");
assertThat(Environments.expandAnyOfEnvironments(env), contains(env));
}

@Test
public void testExpandAnyOfEnvironmentsOnNestedEnvironment() {
Environment envA = Environments.createDockerEnvironment("A");
Environment envB = Environments.createDockerEnvironment("B");
Environment envC = Environments.createDockerEnvironment("C");
Environment env =
Environments.createAnyOfEnvironment(envA, Environments.createAnyOfEnvironment(envB, envC));
assertThat(Environments.expandAnyOfEnvironments(env), contains(envA, envB, envC));
}

@Test
public void testResolveAnyOfEnvironment() {
Environment dockerEnv = Environments.createDockerEnvironment("A");
Environment processEnv =
Environments.createProcessEnvironment("os", "arch", "cmd", new HashMap<>());
Environment env =
Environments.createAnyOfEnvironment(
dockerEnv, Environments.createAnyOfEnvironment(processEnv));
assertThat(
Environments.resolveAnyOfEnvironment(
env, BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER)),
equalTo(dockerEnv));
assertThat(
Environments.resolveAnyOfEnvironment(
env, BeamUrns.getUrn(StandardEnvironments.Environments.PROCESS)),
equalTo(processEnv));
assertThat(
Environments.resolveAnyOfEnvironment(
env, BeamUrns.getUrn(StandardEnvironments.Environments.EXTERNAL)),
notNullValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,21 @@ public PTransformReplacement<PCollection<InputT>, PCollection<OutputT>> getRepla
}
}

private RunnerApi.Pipeline resolveAnyOfEnvironments(RunnerApi.Pipeline pipeline) {
RunnerApi.Pipeline.Builder pipelineBuilder = pipeline.toBuilder();
RunnerApi.Components.Builder componentsBuilder = pipelineBuilder.getComponentsBuilder();
componentsBuilder.clearEnvironments();
for (Map.Entry<String, RunnerApi.Environment> entry :
pipeline.getComponents().getEnvironmentsMap().entrySet()) {
componentsBuilder.putEnvironments(
entry.getKey(),
Environments.resolveAnyOfEnvironment(
entry.getValue(),
BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER)));
}
return pipelineBuilder.build();
}

protected RunnerApi.Pipeline applySdkEnvironmentOverrides(
RunnerApi.Pipeline pipeline, DataflowPipelineOptions options) {
String sdkHarnessContainerImageOverrides = options.getSdkHarnessContainerImageOverrides();
Expand Down Expand Up @@ -1173,6 +1188,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
PipelineTranslation.toProto(pipeline, portableComponents, false);
// Note that `stageArtifacts` has to be called before `resolveArtifact` because
// `resolveArtifact` updates local paths to staged paths in pipeline proto.
portablePipelineProto = resolveAnyOfEnvironments(portablePipelineProto);
List<DataflowPackage> packages = stageArtifacts(portablePipelineProto);
portablePipelineProto = resolveArtifacts(portablePipelineProto);
portablePipelineProto = applySdkEnvironmentOverrides(portablePipelineProto, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ private ImmutableList<EnvironmentCacheAndLock> createEnvironmentCaches(
new CacheLoader<Environment, WrappedSdkHarnessClient>() {
@Override
public WrappedSdkHarnessClient load(Environment environment) throws Exception {
// TODO(robertwb): Docker is the the safest fallback (if we are distributed)
// but it would be good to have the ability to make a more intellegent choice
// (e.g. in-process or loopback workers, especially if running locally).
environment =
Environments.resolveAnyOfEnvironment(
environment, BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER));
EnvironmentFactory.Provider environmentFactoryProvider =
environmentFactoryProviderMap.get(environment.getUrn());
ServerFactory serverFactory = environmentFactoryProvider.getServerFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.graph.PipelineValidator;
import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
import org.apache.beam.sdk.fn.server.FnService;
Expand All @@ -62,7 +63,6 @@
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.StatusRuntimeException;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -198,11 +198,7 @@ public void prepare(

stagingService
.getService()
.registerJob(
stagingSessionToken,
Maps.transformValues(
request.getPipeline().getComponents().getEnvironmentsMap(),
RunnerApi.Environment::getDependenciesList));
.registerJob(stagingSessionToken, extractDependencies(request.getPipeline()));

// send response
PrepareJobResponse response =
Expand Down Expand Up @@ -287,26 +283,50 @@ public void run(RunJobRequest request, StreamObserver<RunJobResponse> responseOb
}
}

private Map<String, List<RunnerApi.ArtifactInformation>> extractDependencies(
RunnerApi.Pipeline pipeline) {
Map<String, List<RunnerApi.ArtifactInformation>> dependencies = new HashMap<>();
for (Map.Entry<String, RunnerApi.Environment> entry :
pipeline.getComponents().getEnvironmentsMap().entrySet()) {
List<RunnerApi.Environment> subEnvs = Environments.expandAnyOfEnvironments(entry.getValue());
for (int i = 0; i < subEnvs.size(); i++) {
dependencies.put(i + ":" + entry.getKey(), subEnvs.get(i).getDependenciesList());
}
}
return dependencies;
}

private RunnerApi.Pipeline resolveDependencies(RunnerApi.Pipeline pipeline, String stagingToken) {
Map<String, List<RunnerApi.ArtifactInformation>> resolvedDependencies =
stagingService.getService().getStagedArtifacts(stagingToken);
Map<String, RunnerApi.Environment> newEnvironments = new HashMap<>();
for (Map.Entry<String, RunnerApi.Environment> entry :
pipeline.getComponents().getEnvironmentsMap().entrySet()) {
if (entry.getValue().getDependenciesCount() > 0 && resolvedDependencies == null) {
throw new RuntimeException(
"Artifact dependencies provided but not staged for " + entry.getKey());
List<RunnerApi.Environment> subEnvs = Environments.expandAnyOfEnvironments(entry.getValue());
List<RunnerApi.Environment> newSubEnvs = new ArrayList<>();
for (int i = 0; i < subEnvs.size(); i++) {
RunnerApi.Environment subEnv = subEnvs.get(i);
if (subEnv.getDependenciesCount() > 0 && resolvedDependencies == null) {
throw new RuntimeException(
"Artifact dependencies provided but not staged for " + entry.getKey());
}
newSubEnvs.add(
subEnv.getDependenciesCount() == 0
? subEnv
: subEnv
.toBuilder()
.clearDependencies()
.addAllDependencies(resolvedDependencies.get(i + ":" + entry.getKey()))
.build());
}
if (newSubEnvs.size() == 1) {
newEnvironments.put(entry.getKey(), newSubEnvs.get(0));
} else {
newEnvironments.put(
entry.getKey(),
Environments.createAnyOfEnvironment(
newSubEnvs.toArray(new RunnerApi.Environment[newSubEnvs.size()])));
}
newEnvironments.put(
entry.getKey(),
entry.getValue().getDependenciesCount() == 0
? entry.getValue()
: entry
.getValue()
.toBuilder()
.clearDependencies()
.addAllDependencies(resolvedDependencies.get(entry.getKey()))
.build());
}
RunnerApi.Pipeline.Builder builder = pipeline.toBuilder();
builder.getComponentsBuilder().clearEnvironments().putAllEnvironments(newEnvironments);
Expand Down
3 changes: 3 additions & 0 deletions sdks/java/expansion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ test {

dependencies {
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":model:fn-execution", configuration: "shadow")
implementation project(path: ":model:job-management", configuration: "shadow")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":runners:core-construction-java")
implementation project(path: ":runners:java-fn-execution")
implementation project(path: ":sdks:java:fn-execution")
implementation project(path: ":sdks:java:harness")
permitUnusedDeclared project(path: ":model:fn-execution")
permitUnusedDeclared project(path: ":sdks:java:fn-execution")
implementation library.java.jackson_annotations
implementation library.java.jackson_databind
Expand Down
Loading

0 comments on commit f337c74

Please sign in to comment.