forked from apache/beam
-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add job submission to Flink runner. #15
Open
axelmagn
wants to merge
2
commits into
bsidhom:flink-portable-runner
Choose a base branch
from
axelmagn:topic-job-server
base: flink-portable-runner
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
31 changes: 31 additions & 0 deletions
31
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkCachedArtifactNames.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
package org.apache.beam.runners.flink; | ||
|
||
/** | ||
* Determines artifact path names within the | ||
* {@link org.apache.flink.api.common.cache.DistributedCache}. | ||
*/ | ||
public class FlinkCachedArtifactNames { | ||
private static final String DEFAULT_ARTIFACT_TOKEN = "default"; | ||
|
||
public static FlinkCachedArtifactNames createDefault() { | ||
return new FlinkCachedArtifactNames(DEFAULT_ARTIFACT_TOKEN); | ||
} | ||
|
||
public static FlinkCachedArtifactNames forToken(String artifactToken) { | ||
return new FlinkCachedArtifactNames(artifactToken); | ||
} | ||
|
||
private final String token; | ||
|
||
private FlinkCachedArtifactNames(String token) { | ||
this.token = token; | ||
} | ||
|
||
public String getArtifactHandle(String name) { | ||
return String.format("ARTIFACT_%s_%s", token, name); | ||
} | ||
|
||
public String getManifestHandle() { | ||
return String.format("MANIFEST_%s", token); | ||
} | ||
} |
31 changes: 0 additions & 31 deletions
31
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkCachedArtifactPaths.java
This file was deleted.
Oops, something went wrong.
88 changes: 88 additions & 0 deletions
88
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
package org.apache.beam.runners.flink; | ||
|
||
|
||
import com.google.common.util.concurrent.ListenableFuture; | ||
import com.google.common.util.concurrent.ListeningExecutorService; | ||
import io.grpc.stub.StreamObserver; | ||
import javax.annotation.Nullable; | ||
import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; | ||
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; | ||
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; | ||
import org.apache.beam.sdk.Pipeline; | ||
import org.apache.beam.sdk.PipelineResult; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* Invocation of a Flink Job via {@link FlinkRunner}. | ||
*/ | ||
public class FlinkJobInvocation implements JobInvocation { | ||
private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class); | ||
|
||
public static FlinkJobInvocation create( | ||
String id, | ||
ListeningExecutorService executorService, | ||
FlinkRunner runner, Pipeline pipeline) { | ||
return new FlinkJobInvocation(id, executorService, runner, pipeline); | ||
} | ||
|
||
private final String id; | ||
private final ListeningExecutorService executorService; | ||
private final FlinkRunner runner; | ||
private final Pipeline pipeline; | ||
|
||
@Nullable | ||
private ListenableFuture<PipelineResult> invocationFuture; | ||
|
||
private FlinkJobInvocation( | ||
String id, | ||
ListeningExecutorService executorService, | ||
FlinkRunner runner, | ||
Pipeline pipeline) { | ||
this.id = id; | ||
this.executorService = executorService; | ||
this.runner = runner; | ||
this.pipeline = pipeline; | ||
this.invocationFuture = null; | ||
} | ||
|
||
@Override | ||
public void start() { | ||
LOG.trace("Starting job invocation {}", getId()); | ||
synchronized (this) { | ||
invocationFuture = executorService.submit(() -> runner.run(pipeline)); | ||
} | ||
} | ||
|
||
@Override | ||
public String getId() { | ||
return id; | ||
} | ||
|
||
@Override | ||
public void cancel() { | ||
LOG.trace("Canceling job invocation {}", getId()); | ||
synchronized (this) { | ||
if (this.invocationFuture != null) { | ||
this.invocationFuture.cancel(true /* mayInterruptIfRunning */); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public Enum getState() { | ||
LOG.warn("getState() not yet implemented."); | ||
return Enum.UNSPECIFIED; | ||
} | ||
|
||
@Override | ||
public void addStateObserver(StreamObserver<Enum> stateStreamObserver) { | ||
LOG.warn("addStateObserver() not yet implemented."); | ||
stateStreamObserver.onNext(getState()); | ||
} | ||
|
||
@Override | ||
public void addMessageObserver(StreamObserver<JobMessage> messageStreamObserver) { | ||
LOG.warn("addMessageObserver() not yet implemented."); | ||
} | ||
} |
42 changes: 42 additions & 0 deletions
42
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
package org.apache.beam.runners.flink; | ||
|
||
import com.google.common.util.concurrent.ListeningExecutorService; | ||
import java.io.IOException; | ||
import java.util.concurrent.ThreadLocalRandom; | ||
import javax.annotation.Nullable; | ||
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; | ||
import org.apache.beam.runners.core.construction.PipelineTranslation; | ||
import org.apache.beam.runners.fnexecution.artifact.ArtifactSource; | ||
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; | ||
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; | ||
import org.apache.beam.runners.fnexecution.jobsubmission.JobPreparation; | ||
import org.apache.beam.sdk.Pipeline; | ||
import org.apache.beam.sdk.options.PipelineOptions; | ||
|
||
/** | ||
* Job Invoker for the {@link FlinkRunner}. | ||
*/ | ||
public class FlinkJobInvoker implements JobInvoker { | ||
public static FlinkJobInvoker create(ListeningExecutorService executorService) { | ||
return new FlinkJobInvoker(executorService); | ||
} | ||
|
||
private final ListeningExecutorService executorService; | ||
|
||
private FlinkJobInvoker(ListeningExecutorService executorService) { | ||
this.executorService = executorService; | ||
} | ||
|
||
@Override | ||
public JobInvocation invoke(JobPreparation preparation, @Nullable String artifactToken) | ||
throws IOException { | ||
String invocationId = | ||
String.format("%s_%d", preparation.id(), ThreadLocalRandom.current().nextInt()); | ||
PipelineOptions options = PipelineOptionsTranslation.fromProto(preparation.options()); | ||
Pipeline pipeline = PipelineTranslation.fromProto(preparation.pipeline()); | ||
FlinkRunner runner = FlinkRunner.fromOptions(options); | ||
ArtifactSource artifactSource = preparation.stagingService().getService().createAccessor(); | ||
runner.setArtifactSource(artifactSource); | ||
return FlinkJobInvocation.create(invocationId, executorService, runner, pipeline); | ||
} | ||
} |
134 changes: 134 additions & 0 deletions
134
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
package org.apache.beam.runners.flink; | ||
|
||
import com.google.common.util.concurrent.ListeningExecutorService; | ||
import com.google.common.util.concurrent.MoreExecutors; | ||
import com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
import java.io.IOException; | ||
import java.nio.file.Path; | ||
import java.nio.file.Paths; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ThreadFactory; | ||
import org.apache.beam.artifact.local.LocalFileSystemArtifactStagerService; | ||
import org.apache.beam.model.pipeline.v1.Endpoints; | ||
import org.apache.beam.runners.fnexecution.GrpcFnServer; | ||
import org.apache.beam.runners.fnexecution.ServerFactory; | ||
import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; | ||
import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; | ||
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; | ||
import org.apache.beam.runners.fnexecution.jobsubmission.JobService; | ||
import org.kohsuke.args4j.CmdLineException; | ||
import org.kohsuke.args4j.CmdLineParser; | ||
import org.kohsuke.args4j.Option; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
||
/** Driver program that starts a job server. */ | ||
public class FlinkJobServerDriver implements Runnable { | ||
private static final Logger LOG = LoggerFactory.getLogger(FlinkJobServerDriver.class); | ||
|
||
private static class ServerConfiguration { | ||
@Option( | ||
name = "--job-host", | ||
required = true, | ||
usage = "The job server host string" | ||
) | ||
private String host = ""; | ||
|
||
@Option( | ||
name = "--artifacts-dir", | ||
usage = "The location to store staged artifact files" | ||
) | ||
private String artifactStagingPath = "/tmp/beam-artifact-staging"; | ||
} | ||
|
||
public static void main(String[] args) { | ||
ServerConfiguration configuration = new ServerConfiguration(); | ||
CmdLineParser parser = new CmdLineParser(configuration); | ||
try { | ||
parser.parseArgument(args); | ||
} catch (CmdLineException e) { | ||
e.printStackTrace(System.err); | ||
printUsage(parser); | ||
return; | ||
} | ||
FlinkJobServerDriver driver = fromConfig(configuration); | ||
driver.run(); | ||
} | ||
|
||
private static void printUsage(CmdLineParser parser) { | ||
System.err.println( | ||
String.format( | ||
"Usage: java %s arguments...", FlinkJobServerDriver.class.getSimpleName())); | ||
parser.printUsage(System.err); | ||
System.err.println(); | ||
} | ||
|
||
public static FlinkJobServerDriver fromConfig(ServerConfiguration configuration) { | ||
ThreadFactory threadFactory = | ||
new ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build(); | ||
ListeningExecutorService executor = | ||
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory)); | ||
ServerFactory serverFactory = ServerFactory.createDefault(); | ||
return create(configuration, executor, serverFactory); | ||
} | ||
|
||
public static FlinkJobServerDriver create( | ||
ServerConfiguration configuration, | ||
ListeningExecutorService executor, | ||
ServerFactory serverFactory) { | ||
return new FlinkJobServerDriver(configuration, executor, serverFactory); | ||
} | ||
|
||
private final ListeningExecutorService executor; | ||
private final ServerConfiguration configuration; | ||
private final ServerFactory serverFactory; | ||
|
||
private FlinkJobServerDriver( | ||
ServerConfiguration configuration, | ||
ListeningExecutorService executor, | ||
ServerFactory serverFactory) { | ||
this.configuration = configuration; | ||
this.executor = executor; | ||
this.serverFactory = serverFactory; | ||
} | ||
|
||
@Override | ||
public void run() { | ||
try { | ||
GrpcFnServer<JobService> server = createJobServer(); | ||
server.getServer().awaitTermination(); | ||
} catch (InterruptedException e) { | ||
LOG.warn("Job server interrupted", e); | ||
} catch (Exception e) { | ||
LOG.warn("Exception during job server creation", e); | ||
} | ||
} | ||
|
||
private GrpcFnServer<JobService> createJobServer() throws IOException { | ||
JobService service = createJobService(); | ||
Endpoints.ApiServiceDescriptor descriptor = | ||
Endpoints.ApiServiceDescriptor.newBuilder().setUrl(configuration.host).build(); | ||
return GrpcFnServer.create(service, descriptor, serverFactory); | ||
} | ||
|
||
private JobService createJobService() { | ||
ArtifactStagingServiceProvider artifactStagingServiceProvider = | ||
createArtifactStagingServiceProvider(); | ||
JobInvoker invoker = createJobInvoker(); | ||
return JobService.create(artifactStagingServiceProvider, invoker); | ||
} | ||
|
||
private ArtifactStagingServiceProvider createArtifactStagingServiceProvider() { | ||
return jobPreparationId -> { | ||
Path location = Paths.get(configuration.artifactStagingPath).resolve(jobPreparationId); | ||
ArtifactStagingService service = | ||
LocalFileSystemArtifactStagerService.withRootDirectory(location.toFile()); | ||
return GrpcFnServer.allocatePortAndCreateFor(service, serverFactory); | ||
}; | ||
} | ||
|
||
private JobInvoker createJobInvoker() { | ||
return FlinkJobInvoker.create(executor); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just throwing out some ideas:
I managed to make some changes to the Flink
ClusterClient
interface. This should make it easier to implement the methods of this: https://github.com/apache/flink/blob/10d52f268db3eda7ee1511ea30afb9a982644148/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java. Notable are:submitJob(JobGraph, ClassLoader)
, which is non-blocking if you allsetDetached(true)
beforecancel(JobID)
getJobStatus(JobID)
A
JobGraph
will have a randomJobID
, you can get it from theJobGraph
viagetJobID()
. For streaming you can get theJobGraph
viaStreamExecutionEnvironment.getStreamGraph().getJobGraph()
. For batch it's a bit more involved but a starting point is this: https://github.com/apache/flink/blob/10d52f268db3eda7ee1511ea30afb9a982644148/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java#L890.In general those APIs are somewhat internal and they are also somewhat interesting... 😉