Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job submission to Flink runner. #15

Open
wants to merge 2 commits into
base: flink-portable-runner
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions runners/flink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ dependencies {
shadow project(path: ":runners:core-java", configuration: "shadow")
shadow project(path: ":runners:core-construction-java", configuration: "shadow")
shadow project(path: ":runners:java-fn-execution", configuration: "shadow")
shadow project(path: ":runners:local-artifact-service-java", configuration: "shadow")
shadow library.java.jackson_annotations
shadow library.java.findbugs_jsr305
shadow library.java.slf4j_api
shadow library.java.joda_time
shadow library.java.commons_compress
shadow library.java.args4j
shadow "org.apache.flink:flink-clients_2.11:$flink_version"
shadow "org.apache.flink:flink-core:$flink_version"
shadow "org.apache.flink:flink-metrics-core:$flink_version"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.apache.beam.runners.flink;

/**
* Determines artifact path names within the
* {@link org.apache.flink.api.common.cache.DistributedCache}.
*/
public class FlinkCachedArtifactNames {
private static final String DEFAULT_ARTIFACT_TOKEN = "default";

public static FlinkCachedArtifactNames createDefault() {
return new FlinkCachedArtifactNames(DEFAULT_ARTIFACT_TOKEN);
}

public static FlinkCachedArtifactNames forToken(String artifactToken) {
return new FlinkCachedArtifactNames(artifactToken);
}

private final String token;

private FlinkCachedArtifactNames(String token) {
this.token = token;
}

public String getArtifactHandle(String name) {
return String.format("ARTIFACT_%s_%s", token, name);
}

public String getManifestHandle() {
return String.format("MANIFEST_%s", token);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package org.apache.beam.runners.flink;


import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.grpc.stub.StreamObserver;
import javax.annotation.Nullable;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Invocation of a Flink Job via {@link FlinkRunner}.
*/
public class FlinkJobInvocation implements JobInvocation {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just throwing out some ideas:

I managed to make some changes to the Flink ClusterClient interface. This should make it easier to implement the methods of this: https://github.com/apache/flink/blob/10d52f268db3eda7ee1511ea30afb9a982644148/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java. Notable are:

  • submitJob(JobGraph, ClassLoader), which is non-blocking if you all setDetached(true) before
  • cancel(JobID)
  • getJobStatus(JobID)

A JobGraph will have a random JobID, you can get it from the JobGraph via getJobID(). For streaming you can get the JobGraph via StreamExecutionEnvironment.getStreamGraph().getJobGraph(). For batch it's a bit more involved but a starting point is this: https://github.com/apache/flink/blob/10d52f268db3eda7ee1511ea30afb9a982644148/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java#L890.

In general those APIs are somewhat internal and they are also somewhat interesting... 😉

private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class);

public static FlinkJobInvocation create(
String id,
ListeningExecutorService executorService,
FlinkRunner runner, Pipeline pipeline) {
return new FlinkJobInvocation(id, executorService, runner, pipeline);
}

private final String id;
private final ListeningExecutorService executorService;
private final FlinkRunner runner;
private final Pipeline pipeline;

@Nullable
private ListenableFuture<PipelineResult> invocationFuture;

private FlinkJobInvocation(
String id,
ListeningExecutorService executorService,
FlinkRunner runner,
Pipeline pipeline) {
this.id = id;
this.executorService = executorService;
this.runner = runner;
this.pipeline = pipeline;
this.invocationFuture = null;
}

@Override
public void start() {
LOG.trace("Starting job invocation {}", getId());
synchronized (this) {
invocationFuture = executorService.submit(() -> runner.run(pipeline));
}
}

@Override
public String getId() {
return id;
}

@Override
public void cancel() {
LOG.trace("Canceling job invocation {}", getId());
synchronized (this) {
if (this.invocationFuture != null) {
this.invocationFuture.cancel(true /* mayInterruptIfRunning */);
}
}
}

@Override
public Enum getState() {
LOG.warn("getState() not yet implemented.");
return Enum.UNSPECIFIED;
}

@Override
public void addStateObserver(StreamObserver<Enum> stateStreamObserver) {
LOG.warn("addStateObserver() not yet implemented.");
stateStreamObserver.onNext(getState());
}

@Override
public void addMessageObserver(StreamObserver<JobMessage> messageStreamObserver) {
LOG.warn("addMessageObserver() not yet implemented.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.apache.beam.runners.flink;

import com.google.common.util.concurrent.ListeningExecutorService;
import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.fnexecution.artifact.ArtifactSource;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
import org.apache.beam.runners.fnexecution.jobsubmission.JobPreparation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;

/**
* Job Invoker for the {@link FlinkRunner}.
*/
public class FlinkJobInvoker implements JobInvoker {
public static FlinkJobInvoker create(ListeningExecutorService executorService) {
return new FlinkJobInvoker(executorService);
}

private final ListeningExecutorService executorService;

private FlinkJobInvoker(ListeningExecutorService executorService) {
this.executorService = executorService;
}

@Override
public JobInvocation invoke(JobPreparation preparation, @Nullable String artifactToken)
throws IOException {
String invocationId =
String.format("%s_%d", preparation.id(), ThreadLocalRandom.current().nextInt());
PipelineOptions options = PipelineOptionsTranslation.fromProto(preparation.options());
Pipeline pipeline = PipelineTranslation.fromProto(preparation.pipeline());
FlinkRunner runner = FlinkRunner.fromOptions(options);
ArtifactSource artifactSource = preparation.stagingService().getService().createAccessor();
runner.setArtifactSource(artifactSource);
return FlinkJobInvocation.create(invocationId, executorService, runner, pipeline);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package org.apache.beam.runners.flink;

import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.beam.artifact.local.LocalFileSystemArtifactStagerService;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
import org.apache.beam.runners.fnexecution.jobsubmission.JobService;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/** Driver program that starts a job server. */
public class FlinkJobServerDriver implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(FlinkJobServerDriver.class);

private static class ServerConfiguration {
@Option(
name = "--job-host",
required = true,
usage = "The job server host string"
)
private String host = "";

@Option(
name = "--artifacts-dir",
usage = "The location to store staged artifact files"
)
private String artifactStagingPath = "/tmp/beam-artifact-staging";
}

public static void main(String[] args) {
ServerConfiguration configuration = new ServerConfiguration();
CmdLineParser parser = new CmdLineParser(configuration);
try {
parser.parseArgument(args);
} catch (CmdLineException e) {
e.printStackTrace(System.err);
printUsage(parser);
return;
}
FlinkJobServerDriver driver = fromConfig(configuration);
driver.run();
}

private static void printUsage(CmdLineParser parser) {
System.err.println(
String.format(
"Usage: java %s arguments...", FlinkJobServerDriver.class.getSimpleName()));
parser.printUsage(System.err);
System.err.println();
}

public static FlinkJobServerDriver fromConfig(ServerConfiguration configuration) {
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build();
ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory));
ServerFactory serverFactory = ServerFactory.createDefault();
return create(configuration, executor, serverFactory);
}

public static FlinkJobServerDriver create(
ServerConfiguration configuration,
ListeningExecutorService executor,
ServerFactory serverFactory) {
return new FlinkJobServerDriver(configuration, executor, serverFactory);
}

private final ListeningExecutorService executor;
private final ServerConfiguration configuration;
private final ServerFactory serverFactory;

private FlinkJobServerDriver(
ServerConfiguration configuration,
ListeningExecutorService executor,
ServerFactory serverFactory) {
this.configuration = configuration;
this.executor = executor;
this.serverFactory = serverFactory;
}

@Override
public void run() {
try {
GrpcFnServer<JobService> server = createJobServer();
server.getServer().awaitTermination();
} catch (InterruptedException e) {
LOG.warn("Job server interrupted", e);
} catch (Exception e) {
LOG.warn("Exception during job server creation", e);
}
}

private GrpcFnServer<JobService> createJobServer() throws IOException {
JobService service = createJobService();
Endpoints.ApiServiceDescriptor descriptor =
Endpoints.ApiServiceDescriptor.newBuilder().setUrl(configuration.host).build();
return GrpcFnServer.create(service, descriptor, serverFactory);
}

private JobService createJobService() {
ArtifactStagingServiceProvider artifactStagingServiceProvider =
createArtifactStagingServiceProvider();
JobInvoker invoker = createJobInvoker();
return JobService.create(artifactStagingServiceProvider, invoker);
}

private ArtifactStagingServiceProvider createArtifactStagingServiceProvider() {
return jobPreparationId -> {
Path location = Paths.get(configuration.artifactStagingPath).resolve(jobPreparationId);
ArtifactStagingService service =
LocalFileSystemArtifactStagerService.withRootDirectory(location.toFile());
return GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
};
}

private JobInvoker createJobInvoker() {
return FlinkJobInvoker.create(executor);
}
}
Loading