From e3e0f5b7b2f584d0850ca946f3a5d18e370b51d0 Mon Sep 17 00:00:00 2001 From: Axel Magnuson Date: Tue, 6 Mar 2018 13:36:52 -0800 Subject: [PATCH 1/2] Add generic JobService framework for runners. Adds a JobService implementation to java-fn-execution that should be generalizable across runners that implement the new JobInvoker and JobInvocation interfaces. --- .../fnexecution/artifact/ArtifactSource.java | 3 +- .../artifact/ArtifactStagingService.java | 32 +++ .../ArtifactStagingServiceProvider.java | 11 + .../artifact/GrpcArtifactProxyService.java | 1 - .../SingletonDockerEnvironmentManager.java | 1 - .../jobsubmission/JobInvocation.java | 41 +++ .../fnexecution/jobsubmission/JobInvoker.java | 15 ++ .../jobsubmission/JobPreparation.java | 29 +++ .../fnexecution/jobsubmission/JobService.java | 234 ++++++++++++++++++ .../TransformStreamObserver.java | 58 +++++ .../jobsubmission/package-info.java | 22 ++ .../jobsubmission/JobServiceTest.java | 44 ++++ .../artifact/local/LocalArtifactSource.java | 62 +++++ .../LocalFileSystemArtifactStagerService.java | 15 +- 14 files changed, 559 insertions(+), 9 deletions(-) create mode 100644 runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java create mode 100644 runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingServiceProvider.java create mode 100644 runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java create mode 100644 runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java create mode 100644 runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobPreparation.java create mode 100644 runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java create mode 100644 runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/TransformStreamObserver.java create mode 100644 runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/package-info.java create mode 100644 runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServiceTest.java create mode 100644 runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/LocalArtifactSource.java diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java index 767bcf0ea3f2..a8b582640b55 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java @@ -18,9 +18,8 @@ package org.apache.beam.runners.fnexecution.artifact; -import java.io.IOException; - import io.grpc.stub.StreamObserver; +import java.io.IOException; import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactChunk; import org.apache.beam.model.jobmanagement.v1.ArtifactApi.Manifest; diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java new file mode 100644 index 000000000000..1f2b4f63ed63 --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.artifact; + +import org.apache.beam.runners.fnexecution.FnService; + +/** An implementation of the Beam Artifact Staging Service. */ +public interface ArtifactStagingService extends FnService { + /** + * Get an artifact source that can access staged artifacts. + * + *

Once an artifact staging service has staged artifacts, runners need a way to access them. + * Thus this method provides an ArtifactSource that can access them. + */ + ArtifactSource createAccessor(); +} diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingServiceProvider.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingServiceProvider.java new file mode 100644 index 000000000000..e72e57b0844c --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingServiceProvider.java @@ -0,0 +1,11 @@ +package org.apache.beam.runners.fnexecution.artifact; + +import java.io.IOException; +import org.apache.beam.runners.fnexecution.GrpcFnServer; + +/** + * An interface that will provide artifact staging services for individual jobs. + */ +public interface ArtifactStagingServiceProvider { + GrpcFnServer forJob(String jobPreparationId) throws IOException; +} diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/GrpcArtifactProxyService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/GrpcArtifactProxyService.java index 933e083b0830..91fb3da36c57 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/GrpcArtifactProxyService.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/GrpcArtifactProxyService.java @@ -3,7 +3,6 @@ import io.grpc.Status; import io.grpc.stub.StreamObserver; -import java.util.stream.Stream; import org.apache.beam.model.jobmanagement.v1.ArtifactApi; import org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc; import org.apache.beam.runners.fnexecution.FnService; diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/SingletonDockerEnvironmentManager.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/SingletonDockerEnvironmentManager.java index 4b3b730976e8..a428a47de397 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/SingletonDockerEnvironmentManager.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/SingletonDockerEnvironmentManager.java @@ -25,7 +25,6 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeoutException; -import javax.annotation.concurrent.GuardedBy; import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; import org.apache.beam.runners.fnexecution.GrpcFnServer; import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService; diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java new file mode 100644 index 000000000000..d6e397bf6b21 --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java @@ -0,0 +1,41 @@ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import io.grpc.stub.StreamObserver; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; + +/** + * Internal representation of a Job which has been invoked (prepared and run) by a client. + */ +public interface JobInvocation { + + /** + * Start the job. + */ + void start(); + + /** + * @return Unique identifier for the job invocation. + */ + String getId(); + + /** + * Cancel the job. + */ + void cancel(); + + /** + * Retrieve the job's current state. + */ + JobState.Enum getState(); + + /** + * Observe job state changes with an observer. + */ + void addStateObserver(StreamObserver stateStreamObserver); + + /** + * Observe job messages with an observer. + */ + void addMessageObserver(StreamObserver messageStreamObserver); +} diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java new file mode 100644 index 000000000000..eb5f477551d9 --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java @@ -0,0 +1,15 @@ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import java.io.IOException; +import javax.annotation.Nullable; + +/** + * Factory to create a {@link JobInvocation} instances. + */ +public interface JobInvoker { + /** + * Create a {@link JobInvocation} instance from a {@link JobPreparation} and an artifact token. + */ + JobInvocation invoke(JobPreparation preparation, @Nullable String artifactToken) + throws IOException; +} diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobPreparation.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobPreparation.java new file mode 100644 index 000000000000..871fbaa30202 --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobPreparation.java @@ -0,0 +1,29 @@ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.auto.value.AutoValue; +import com.google.protobuf.Struct; +import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; + +/** A job that has been prepared, but not invoked. */ +@AutoValue +public abstract class JobPreparation { + public static Builder builder() { + return new AutoValue_JobPreparation.Builder(); + } + + public abstract String id(); + public abstract Pipeline pipeline(); + public abstract Struct options(); + public abstract GrpcFnServer stagingService(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setId(String id); + abstract Builder setPipeline(Pipeline pipeline); + abstract Builder setOptions(Struct options); + abstract Builder setStagingService(GrpcFnServer stagingService); + abstract JobPreparation build(); + } +} diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java new file mode 100644 index 000000000000..e67031eb65a3 --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java @@ -0,0 +1,234 @@ +package org.apache.beam.runners.fnexecution.jobsubmission; + + +import com.google.common.collect.ImmutableList; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** A JobService that prepares and invokes Flink jobs. */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { + return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMap preparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { + this.artifactStagingServiceProvider = artifactStagingServiceProvider; + this.invoker = invoker; + + this.preparations = new ConcurrentHashMap<>(); + this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { + try { + LOG.trace("{} {}", PrepareJobResponse.class.getSimpleName(), request); + // insert preparation + String preparationId = + String.format("%s_%d", request.getJobName(), ThreadLocalRandom.current().nextInt()); + GrpcFnServer stagingService = + artifactStagingServiceProvider.forJob(preparationId); + JobPreparation preparation = + JobPreparation + .builder() + .setId(preparationId) + .setPipeline(request.getPipeline()) + .setOptions(request.getPipelineOptions()) + .setStagingService(stagingService) + .build(); + JobPreparation previous = preparations.putIfAbsent(preparationId, preparation); + if (previous != null) { + String errMessage = + String.format("A job with the preparation ID \"%s\" already exists.", preparationId); + StatusException exception = Status.NOT_FOUND.withDescription(errMessage).asException(); + responseObserver.onError(exception); + return; + } + + // send response + PrepareJobResponse response = + PrepareJobResponse + .newBuilder() + .setPreparationId(preparationId) + .setArtifactStagingEndpoint(stagingService.getApiServiceDescriptor()) + .build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (Exception e) { + LOG.error("Could not prepare job with name {}", request.getJobName(), e); + responseObserver.onError(Status.INTERNAL.withCause(e).asException()); + } + } + + @Override + public void run( + RunJobRequest request, StreamObserver responseObserver) { + try { + LOG.trace("{} {}", RunJobRequest.class.getSimpleName(), request); + + // retrieve job preparation + String preparationId = request.getPreparationId(); + JobPreparation preparation = preparations.get(preparationId); + if (preparation == null) { + String errMessage = String.format("Unknown Preparation Id \"%s\".", preparationId); + StatusException exception = Status.NOT_FOUND.withDescription(errMessage).asException(); + responseObserver.onError(exception); + return; + } + + // create new invocation + JobInvocation invocation = invoker.invoke(preparation, request.getStagingToken()); + String invocationId = invocation.getId(); + invocation.start(); + invocations.put(invocationId, invocation); + RunJobResponse response = + RunJobResponse.newBuilder().setJobId(invocationId).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (StatusRuntimeException e) { + responseObserver.onError(e); + } catch (Exception e) { + responseObserver.onError(Status.INTERNAL.withCause(e).asException()); + } + } + + @Override + public void getState( + GetJobStateRequest request, StreamObserver responseObserver) { + try { + LOG.trace("{} {}", GetJobStateRequest.class.getSimpleName(), request); + String invocationId = request.getJobId(); + JobInvocation invocation = invocations.get(invocationId); + + JobState.Enum state; + synchronized (invocation) { + state = invocation.getState(); + } + + GetJobStateResponse response = GetJobStateResponse.newBuilder().setState(state).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (Exception e) { + responseObserver.onError(Status.INTERNAL.withCause(e).asException()); + } + } + + @Override + public void cancel(CancelJobRequest request, StreamObserver responseObserver) { + try { + LOG.trace("{} {}", CancelJobRequest.class.getSimpleName(), request); + String invocationId = request.getJobId(); + JobInvocation invocation = invocations.get(invocationId); + + JobState.Enum state; + synchronized (invocation) { + invocation.cancel(); + state = invocation.getState(); + } + + CancelJobResponse response = CancelJobResponse.newBuilder().setState(state).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (Exception e) { + responseObserver.onError(Status.INTERNAL.withCause(e).asException()); + } + } + + @Override + public void getStateStream( + GetJobStateRequest request, + StreamObserver responseObserver) { + try { + String invocationId = request.getJobId(); + JobInvocation invocation = invocations.get(invocationId); + + Function responseFunction = + state -> GetJobStateResponse.newBuilder().setState(state).build(); + TransformStreamObserver stateObserver = + TransformStreamObserver.create(responseFunction, responseObserver); + synchronized (invocation) { + invocation.addStateObserver(stateObserver); + } + } catch (Exception e) { + responseObserver.onError(Status.INTERNAL.withCause(e).asException()); + } + } + + @Override + public void getMessageStream( + JobMessagesRequest request, + StreamObserver responseObserver) { + try { + String invocationId = request.getJobId(); + JobInvocation invocation = invocations.get(invocationId); + + Function stateResponseFunction = + state -> + JobMessagesResponse + .newBuilder() + .setStateResponse(GetJobStateResponse.newBuilder().setState(state).build()) + .build(); + TransformStreamObserver stateObserver = + TransformStreamObserver.create(stateResponseFunction, responseObserver); + + Function messagesResponseFunction = + message -> JobMessagesResponse.newBuilder().setMessageResponse(message).build(); + TransformStreamObserver messageObserver = + TransformStreamObserver.create(messagesResponseFunction, responseObserver); + + synchronized (invocation) { + invocation.addStateObserver(stateObserver); + invocation.addMessageObserver(messageObserver); + } + } catch (Exception e) { + responseObserver.onError(Status.INTERNAL.withCause(e).asException()); + } + } + + @Override + public void close() throws Exception { + for (JobPreparation preparation : ImmutableList.copyOf(preparations.values())) { + try { + preparation.stagingService().close(); + } catch (Exception e) { + LOG.warn("Exception while closing job {}", preparation); + } + } + } +} diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/TransformStreamObserver.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/TransformStreamObserver.java new file mode 100644 index 000000000000..604516c7b163 --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/TransformStreamObserver.java @@ -0,0 +1,58 @@ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import io.grpc.stub.StreamObserver; +import java.util.function.Function; + +/** + * Utility class that transforms observations into one that an output observer can accept. + * + *

This class synchronizes on outputObserver when handling it, allowing for multiple + * TransformStreamObservers to multiplex to a single outputObserver. + * + * @param Input Type + * @param Output Type + */ +class TransformStreamObserver implements StreamObserver { + + /** + * Create a new TransformStreamObserver. + * + * @param transform The function used to transform observations. + * @param outputObserver The observer to forward transform outputs to. + */ + public static TransformStreamObserver create( + Function transform, StreamObserver outputObserver) { + return new TransformStreamObserver<>(transform, outputObserver); + } + + private final Function transform; + private final StreamObserver outputObserver; + + private TransformStreamObserver(Function transform, StreamObserver outputObserver) { + this.transform = transform; + this.outputObserver = outputObserver; + } + + @Override + public void onNext(T1 i) { + T2 o = transform.apply(i); + synchronized (outputObserver) { + outputObserver.onNext(o); + } + } + + @Override + public void onError(Throwable throwable) { + synchronized (outputObserver) { + outputObserver.onError(throwable); + } + } + + @Override + public void onCompleted() { + synchronized (outputObserver) { + outputObserver.onCompleted(); + } + } +} + diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/package-info.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/package-info.java new file mode 100644 index 000000000000..1116b6aeb168 --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal job management service implementation of the Beam runner for Apache Flink. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServiceTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServiceTest.java new file mode 100644 index 000000000000..8438c3d34815 --- /dev/null +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServiceTest.java @@ -0,0 +1,44 @@ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.InProcessServerFactory; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Tests for {@link JobService}. */ +@RunWith(JUnit4.class) +public class JobServiceTest { + @Mock + GrpcFnServer artifactStagingServer; + @Mock + JobInvoker invoker; + + JobService service; + GrpcFnServer server; + + @Before + void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + ArtifactStagingServiceProvider provider = ignored -> artifactStagingServer; + service = JobService.create(provider, invoker); + server = GrpcFnServer.allocatePortAndCreateFor(service, InProcessServerFactory.create()); + } + + @After + void tearDown() throws Exception { + server.close(); + } + + @Test + void testJobSuccessfullyProcessed() throws Exception { + // TODO: prepare and start a job + + } +} diff --git a/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/LocalArtifactSource.java b/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/LocalArtifactSource.java new file mode 100644 index 000000000000..77b93341b363 --- /dev/null +++ b/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/LocalArtifactSource.java @@ -0,0 +1,62 @@ +package org.apache.beam.artifact.local; + +import com.google.protobuf.ByteString; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi; +import org.apache.beam.runners.fnexecution.artifact.ArtifactSource; + +/** + * An artifact source drawn from a local file system. + */ +public class LocalArtifactSource implements ArtifactSource { + private static final int DEFAULT_CHUNK_SIZE_BYTES = 2 * 1024 * 1024; + + public static LocalArtifactSource create(LocalArtifactStagingLocation location) { + return new LocalArtifactSource(location); + } + + private final LocalArtifactStagingLocation location; + + private LocalArtifactSource(LocalArtifactStagingLocation location) { + this.location = location; + } + + @Override + public ArtifactApi.Manifest getManifest() throws IOException { + File manifestFile = location.getManifestFile(); + try (FileInputStream fileInputStream = new FileInputStream(manifestFile)) { + return ArtifactApi.Manifest.parseFrom(fileInputStream); + } + } + + @Override + public void getArtifact(String name, StreamObserver responseObserver) { + File artifactFile = location.getArtifactFile(name); + try (FileInputStream fStream = new FileInputStream(artifactFile)) { + byte[] buffer = new byte[DEFAULT_CHUNK_SIZE_BYTES]; + for (int bytesRead = fStream.read(buffer); bytesRead > 0; bytesRead = fStream.read(buffer)) { + ByteString data = ByteString.copyFrom(buffer, 0, bytesRead); + responseObserver.onNext(ArtifactApi.ArtifactChunk.newBuilder().setData(data).build()); + } + responseObserver.onCompleted(); + } catch (FileNotFoundException e) { + responseObserver.onError( + Status.INVALID_ARGUMENT + .withDescription(String.format("No such artifact %s", name)) + .withCause(e) + .asException()); + } catch (Exception e) { + responseObserver.onError( + Status.INTERNAL + .withDescription( + String.format("Could not retrieve artifact with name %s", name)) + .withCause(e) + .asException()); + } + } +} diff --git a/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/LocalFileSystemArtifactStagerService.java b/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/LocalFileSystemArtifactStagerService.java index 049d6147633b..596f85ff0c72 100644 --- a/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/LocalFileSystemArtifactStagerService.java +++ b/runners/local-artifact-service-java/src/main/java/org/apache/beam/artifact/local/LocalFileSystemArtifactStagerService.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkState; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import io.grpc.Status; import io.grpc.StatusException; @@ -35,13 +34,15 @@ import javax.annotation.Nullable; import org.apache.beam.model.jobmanagement.v1.ArtifactApi; import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc; -import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactSource; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** An {@code ArtifactStagingService} which stages files to a local temp directory. */ public class LocalFileSystemArtifactStagerService - extends ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase implements FnService { + extends ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase + implements ArtifactStagingService { private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystemArtifactStagerService.class); @@ -109,13 +110,17 @@ private void commitManifestOrThrow( responseObserver.onCompleted(); } + @Override + public ArtifactSource createAccessor() { + return LocalArtifactSource.create(location); + } + @Override public void close() throws Exception { // TODO: Close all active staging calls, signalling errors to the caller. } - @VisibleForTesting - LocalArtifactStagingLocation getLocation() { + public LocalArtifactStagingLocation getLocation() { return location; } From c7dbaa729827a462a7a9bf6dcd289ca0ed2f43a2 Mon Sep 17 00:00:00 2001 From: Axel Magnuson Date: Tue, 6 Mar 2018 13:42:28 -0800 Subject: [PATCH 2/2] Add job submission capabilities to Flink runner. This implements job invocation wrappers for the Flink runner, as well as a standalone driver for a JobService daemon. Together these complete the job submission story for the portability framework on the Flink runner. --- runners/flink/build.gradle | 2 + .../flink/FlinkCachedArtifactNames.java | 31 ++++ .../flink/FlinkCachedArtifactPaths.java | 31 ---- .../runners/flink/FlinkJobInvocation.java | 88 ++++++++++++ .../beam/runners/flink/FlinkJobInvoker.java | 42 ++++++ .../runners/flink/FlinkJobServerDriver.java | 134 ++++++++++++++++++ .../FlinkPipelineExecutionEnvironment.java | 82 ++++++++++- .../beam/runners/flink/FlinkRunner.java | 21 +++ ...tSource.java => CachedArtifactSource.java} | 20 +-- 9 files changed, 409 insertions(+), 42 deletions(-) create mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkCachedArtifactNames.java delete mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkCachedArtifactPaths.java create mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java create mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java create mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java rename runners/flink/src/main/java/org/apache/beam/runners/flink/execution/{FlinkArtifactSource.java => CachedArtifactSource.java} (76%) diff --git a/runners/flink/build.gradle b/runners/flink/build.gradle index 6603f297e474..2b377ddcc5de 100644 --- a/runners/flink/build.gradle +++ b/runners/flink/build.gradle @@ -54,11 +54,13 @@ dependencies { shadow project(path: ":runners:core-java", configuration: "shadow") shadow project(path: ":runners:core-construction-java", configuration: "shadow") shadow project(path: ":runners:java-fn-execution", configuration: "shadow") + shadow project(path: ":runners:local-artifact-service-java", configuration: "shadow") shadow library.java.jackson_annotations shadow library.java.findbugs_jsr305 shadow library.java.slf4j_api shadow library.java.joda_time shadow library.java.commons_compress + shadow library.java.args4j shadow "org.apache.flink:flink-clients_2.11:$flink_version" shadow "org.apache.flink:flink-core:$flink_version" shadow "org.apache.flink:flink-metrics-core:$flink_version" diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkCachedArtifactNames.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkCachedArtifactNames.java new file mode 100644 index 000000000000..c402601d43be --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkCachedArtifactNames.java @@ -0,0 +1,31 @@ +package org.apache.beam.runners.flink; + +/** + * Determines artifact path names within the + * {@link org.apache.flink.api.common.cache.DistributedCache}. + */ +public class FlinkCachedArtifactNames { + private static final String DEFAULT_ARTIFACT_TOKEN = "default"; + + public static FlinkCachedArtifactNames createDefault() { + return new FlinkCachedArtifactNames(DEFAULT_ARTIFACT_TOKEN); + } + + public static FlinkCachedArtifactNames forToken(String artifactToken) { + return new FlinkCachedArtifactNames(artifactToken); + } + + private final String token; + + private FlinkCachedArtifactNames(String token) { + this.token = token; + } + + public String getArtifactHandle(String name) { + return String.format("ARTIFACT_%s_%s", token, name); + } + + public String getManifestHandle() { + return String.format("MANIFEST_%s", token); + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkCachedArtifactPaths.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkCachedArtifactPaths.java deleted file mode 100644 index 73eb94c44fa9..000000000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkCachedArtifactPaths.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.apache.beam.runners.flink; - -/** - * Determines artifact path names within the - * {@link org.apache.flink.api.common.cache.DistributedCache}. - */ -public class FlinkCachedArtifactPaths { - private static final String DEFAULT_ARTIFACT_TOKEN = "default"; - - public static FlinkCachedArtifactPaths createDefault() { - return new FlinkCachedArtifactPaths(DEFAULT_ARTIFACT_TOKEN); - } - - public static FlinkCachedArtifactPaths forToken(String artifactToken) { - return new FlinkCachedArtifactPaths(artifactToken); - } - - private final String token; - - private FlinkCachedArtifactPaths(String token) { - this.token = token; - } - - public String getArtifactPath(String name) { - return String.format("ARTIFACT_%s_%s", token, name); - } - - public String getManifestPath() { - return String.format("MANIFEST_%s", token); - } -} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java new file mode 100644 index 000000000000..524e77fb9cd9 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java @@ -0,0 +1,88 @@ +package org.apache.beam.runners.flink; + + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import io.grpc.stub.StreamObserver; +import javax.annotation.Nullable; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Invocation of a Flink Job via {@link FlinkRunner}. + */ +public class FlinkJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class); + + public static FlinkJobInvocation create( + String id, + ListeningExecutorService executorService, + FlinkRunner runner, Pipeline pipeline) { + return new FlinkJobInvocation(id, executorService, runner, pipeline); + } + + private final String id; + private final ListeningExecutorService executorService; + private final FlinkRunner runner; + private final Pipeline pipeline; + + @Nullable + private ListenableFuture invocationFuture; + + private FlinkJobInvocation( + String id, + ListeningExecutorService executorService, + FlinkRunner runner, + Pipeline pipeline) { + this.id = id; + this.executorService = executorService; + this.runner = runner; + this.pipeline = pipeline; + this.invocationFuture = null; + } + + @Override + public void start() { + LOG.trace("Starting job invocation {}", getId()); + synchronized (this) { + invocationFuture = executorService.submit(() -> runner.run(pipeline)); + } + } + + @Override + public String getId() { + return id; + } + + @Override + public void cancel() { + LOG.trace("Canceling job invocation {}", getId()); + synchronized (this) { + if (this.invocationFuture != null) { + this.invocationFuture.cancel(true /* mayInterruptIfRunning */); + } + } + } + + @Override + public Enum getState() { + LOG.warn("getState() not yet implemented."); + return Enum.UNSPECIFIED; + } + + @Override + public void addStateObserver(StreamObserver stateStreamObserver) { + LOG.warn("addStateObserver() not yet implemented."); + stateStreamObserver.onNext(getState()); + } + + @Override + public void addMessageObserver(StreamObserver messageStreamObserver) { + LOG.warn("addMessageObserver() not yet implemented."); + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java new file mode 100644 index 000000000000..1f7168ab3be2 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java @@ -0,0 +1,42 @@ +package org.apache.beam.runners.flink; + +import com.google.common.util.concurrent.ListeningExecutorService; +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.PipelineTranslation; +import org.apache.beam.runners.fnexecution.artifact.ArtifactSource; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; +import org.apache.beam.runners.fnexecution.jobsubmission.JobPreparation; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Job Invoker for the {@link FlinkRunner}. + */ +public class FlinkJobInvoker implements JobInvoker { + public static FlinkJobInvoker create(ListeningExecutorService executorService) { + return new FlinkJobInvoker(executorService); + } + + private final ListeningExecutorService executorService; + + private FlinkJobInvoker(ListeningExecutorService executorService) { + this.executorService = executorService; + } + + @Override + public JobInvocation invoke(JobPreparation preparation, @Nullable String artifactToken) + throws IOException { + String invocationId = + String.format("%s_%d", preparation.id(), ThreadLocalRandom.current().nextInt()); + PipelineOptions options = PipelineOptionsTranslation.fromProto(preparation.options()); + Pipeline pipeline = PipelineTranslation.fromProto(preparation.pipeline()); + FlinkRunner runner = FlinkRunner.fromOptions(options); + ArtifactSource artifactSource = preparation.stagingService().getService().createAccessor(); + runner.setArtifactSource(artifactSource); + return FlinkJobInvocation.create(invocationId, executorService, runner, pipeline); + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java new file mode 100644 index 000000000000..f970f26e90c8 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java @@ -0,0 +1,134 @@ +package org.apache.beam.runners.flink; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import org.apache.beam.artifact.local.LocalFileSystemArtifactStagerService; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.ServerFactory; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; +import org.apache.beam.runners.fnexecution.jobsubmission.JobService; +import org.kohsuke.args4j.CmdLineException; +import org.kohsuke.args4j.CmdLineParser; +import org.kohsuke.args4j.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** Driver program that starts a job server. */ +public class FlinkJobServerDriver implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobServerDriver.class); + + private static class ServerConfiguration { + @Option( + name = "--job-host", + required = true, + usage = "The job server host string" + ) + private String host = ""; + + @Option( + name = "--artifacts-dir", + usage = "The location to store staged artifact files" + ) + private String artifactStagingPath = "/tmp/beam-artifact-staging"; + } + + public static void main(String[] args) { + ServerConfiguration configuration = new ServerConfiguration(); + CmdLineParser parser = new CmdLineParser(configuration); + try { + parser.parseArgument(args); + } catch (CmdLineException e) { + e.printStackTrace(System.err); + printUsage(parser); + return; + } + FlinkJobServerDriver driver = fromConfig(configuration); + driver.run(); + } + + private static void printUsage(CmdLineParser parser) { + System.err.println( + String.format( + "Usage: java %s arguments...", FlinkJobServerDriver.class.getSimpleName())); + parser.printUsage(System.err); + System.err.println(); + } + + public static FlinkJobServerDriver fromConfig(ServerConfiguration configuration) { + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build(); + ListeningExecutorService executor = + MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory)); + ServerFactory serverFactory = ServerFactory.createDefault(); + return create(configuration, executor, serverFactory); + } + + public static FlinkJobServerDriver create( + ServerConfiguration configuration, + ListeningExecutorService executor, + ServerFactory serverFactory) { + return new FlinkJobServerDriver(configuration, executor, serverFactory); + } + + private final ListeningExecutorService executor; + private final ServerConfiguration configuration; + private final ServerFactory serverFactory; + + private FlinkJobServerDriver( + ServerConfiguration configuration, + ListeningExecutorService executor, + ServerFactory serverFactory) { + this.configuration = configuration; + this.executor = executor; + this.serverFactory = serverFactory; + } + + @Override + public void run() { + try { + GrpcFnServer server = createJobServer(); + server.getServer().awaitTermination(); + } catch (InterruptedException e) { + LOG.warn("Job server interrupted", e); + } catch (Exception e) { + LOG.warn("Exception during job server creation", e); + } + } + + private GrpcFnServer createJobServer() throws IOException { + JobService service = createJobService(); + Endpoints.ApiServiceDescriptor descriptor = + Endpoints.ApiServiceDescriptor.newBuilder().setUrl(configuration.host).build(); + return GrpcFnServer.create(service, descriptor, serverFactory); + } + + private JobService createJobService() { + ArtifactStagingServiceProvider artifactStagingServiceProvider = + createArtifactStagingServiceProvider(); + JobInvoker invoker = createJobInvoker(); + return JobService.create(artifactStagingServiceProvider, invoker); + } + + private ArtifactStagingServiceProvider createArtifactStagingServiceProvider() { + return jobPreparationId -> { + Path location = Paths.get(configuration.artifactStagingPath).resolve(jobPreparationId); + ArtifactStagingService service = + LocalFileSystemArtifactStagerService.withRootDirectory(location.toFile()); + return GrpcFnServer.allocatePortAndCreateFor(service, serverFactory); + }; + } + + private JobInvoker createJobInvoker() { + return FlinkJobInvoker.create(executor); + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index 7f7281e14bd9..2acbc3953b74 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -19,9 +19,19 @@ import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.io.Files; +import com.google.common.util.concurrent.SettableFuture; +import io.grpc.stub.StreamObserver; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.nio.file.Path; import java.util.List; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactChunk; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi.Manifest; import org.apache.beam.runners.core.construction.PipelineTranslation; +import org.apache.beam.runners.fnexecution.artifact.ArtifactSource; import org.apache.beam.sdk.Pipeline; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.java.CollectionEnvironment; @@ -42,10 +52,39 @@ * transform the Beam job into a Flink one, and executes the (translated) job. */ class FlinkPipelineExecutionEnvironment { - private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class); + private static class ArtifactWriter implements StreamObserver { + + private final FileOutputStream outputStream; + public final SettableFuture result; + + public ArtifactWriter(FileOutputStream outputStream) { + this.outputStream = outputStream; + this.result = SettableFuture.create(); + } + + @Override + public void onNext(ArtifactChunk artifactChunk) { + try { + outputStream.write(artifactChunk.getData().toByteArray()); + } catch (IOException e) { + onError(e); + } + } + + @Override + public void onError(Throwable throwable) { + result.setException(throwable); + } + + @Override + public void onCompleted() { + result.set(null); + } + } + private final FlinkPipelineOptions options; /** @@ -260,4 +299,45 @@ private StreamExecutionEnvironment createStreamExecutionEnvironment() { return flinkStreamEnv; } + public void loadStagedArtifacts(ArtifactSource artifactSource) throws IOException { + // get temp directory for cached files + File tempDir = Files.createTempDir(); + Path tempDirPath = tempDir.toPath(); + FlinkCachedArtifactNames cachedArtifactNames = FlinkCachedArtifactNames.createDefault(); + + // store and register manifest + Manifest manifest = artifactSource.getManifest(); + Path manifestPath = tempDirPath.resolve("MANIFEST"); + String manifestHandle = cachedArtifactNames.getManifestHandle(); + try (FileOutputStream fileOutputStream = new FileOutputStream(manifestPath.toFile())) { + manifest.writeTo(fileOutputStream); + } + registerCachedFile(manifestPath.toUri().toString(), manifestHandle); + + // store and register artifacts + for (ArtifactMetadata metadata : manifest.getArtifactList()) { + String artifactName = metadata.getName(); + String artifactHandle = cachedArtifactNames.getArtifactHandle(artifactName); + Path artifactPath = tempDirPath.resolve(artifactHandle); + try (FileOutputStream fileOutputStream = new FileOutputStream(artifactPath.toFile())) { + ArtifactWriter writer = new ArtifactWriter(fileOutputStream); + artifactSource.getArtifact(artifactHandle, writer); + writer.result.wait(); + registerCachedFile(artifactPath.toUri().toString(), artifactHandle); + } catch (InterruptedException e) { + LOG.warn("Interrupted while writing artifact with name %s", artifactName); + } + } + + } + + private void registerCachedFile(String fileUri, String name) { + if (flinkBatchEnv != null) { + flinkBatchEnv.registerCachedFile(fileUri, name); + } else if (flinkStreamEnv != null) { + flinkStreamEnv.registerCachedFile(fileUri, name); + } else { + throw new IllegalStateException("The Pipeline has not yet been translated."); + } + } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index 5fdcdcec1218..f5e12f2e1c8e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -26,6 +26,8 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import javax.annotation.Nullable; +import org.apache.beam.runners.fnexecution.artifact.ArtifactSource; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineRunner; @@ -54,6 +56,9 @@ public class FlinkRunner extends PipelineRunner { */ private final FlinkPipelineOptions options; + @Nullable + private ArtifactSource artifactSource; + /** * Construct a runner from the provided options. * @@ -94,6 +99,7 @@ public static FlinkRunner fromOptions(PipelineOptions options) { private FlinkRunner(FlinkPipelineOptions options) { this.options = options; this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>(); + this.artifactSource = null; } @Override @@ -109,6 +115,16 @@ public PipelineResult run(Pipeline pipeline) { LOG.info("Translating pipeline to Flink program."); env.translate(this, pipeline); + if (artifactSource != null) { + LOG.info("Registering pipeline artifacts in Flink program."); + try { + env.loadStagedArtifacts(artifactSource); + } catch (Exception e) { + LOG.error("Artifact registration failed", e); + throw new RuntimeException("Artifact registration failed", e); + } + } + JobExecutionResult result; try { LOG.info("Starting execution of Flink program."); @@ -135,6 +151,11 @@ public PipelineResult run(Pipeline pipeline) { } } + /** Optionally set a source for portability artifacts. */ + public void setArtifactSource(ArtifactSource artifactSource) { + this.artifactSource = artifactSource; + } + /** * For testing. */ diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/execution/FlinkArtifactSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/execution/CachedArtifactSource.java similarity index 76% rename from runners/flink/src/main/java/org/apache/beam/runners/flink/execution/FlinkArtifactSource.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/execution/CachedArtifactSource.java index a7b1f471c7e0..69f2d95b1eb0 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/execution/FlinkArtifactSource.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/execution/CachedArtifactSource.java @@ -11,7 +11,7 @@ import java.io.IOException; import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactChunk; import org.apache.beam.model.jobmanagement.v1.ArtifactApi.Manifest; -import org.apache.beam.runners.flink.FlinkCachedArtifactPaths; +import org.apache.beam.runners.flink.FlinkCachedArtifactNames; import org.apache.beam.runners.fnexecution.artifact.ArtifactSource; import org.apache.flink.api.common.cache.DistributedCache; @@ -19,28 +19,28 @@ * An {@link org.apache.beam.runners.fnexecution.artifact.ArtifactSource} that draws artifacts * from the Flink Distributed File Cache {@link org.apache.flink.api.common.cache.DistributedCache}. */ -public class FlinkArtifactSource implements ArtifactSource { +public class CachedArtifactSource implements ArtifactSource { private static final int DEFAULT_CHUNK_SIZE_BYTES = 2 * 1024 * 1024; - public static FlinkArtifactSource createDefault(DistributedCache cache) { - return new FlinkArtifactSource(cache, FlinkCachedArtifactPaths.createDefault()); + public static CachedArtifactSource createDefault(DistributedCache cache) { + return new CachedArtifactSource(cache, FlinkCachedArtifactNames.createDefault()); } - public static FlinkArtifactSource forToken(DistributedCache cache, String artifactToken) { - return new FlinkArtifactSource(cache, FlinkCachedArtifactPaths.forToken(artifactToken)); + public static CachedArtifactSource forToken(DistributedCache cache, String artifactToken) { + return new CachedArtifactSource(cache, FlinkCachedArtifactNames.forToken(artifactToken)); } private final DistributedCache cache; - private final FlinkCachedArtifactPaths paths; + private final FlinkCachedArtifactNames paths; - private FlinkArtifactSource(DistributedCache cache, FlinkCachedArtifactPaths paths) { + private CachedArtifactSource(DistributedCache cache, FlinkCachedArtifactNames paths) { this.cache = cache; this.paths = paths; } @Override public Manifest getManifest() throws IOException { - String path = paths.getManifestPath(); + String path = paths.getManifestHandle(); File manifest = cache.getFile(path); try (BufferedInputStream fStream = new BufferedInputStream(new FileInputStream(manifest))) { return Manifest.parseFrom(fStream); @@ -50,7 +50,7 @@ public Manifest getManifest() throws IOException { @Override public void getArtifact(String name, StreamObserver responseObserver) { - String path = paths.getArtifactPath(name); + String path = paths.getArtifactHandle(name); File artifact = cache.getFile(path); try (FileInputStream fStream = new FileInputStream(artifact)) { byte[] buffer = new byte[DEFAULT_CHUNK_SIZE_BYTES];